vtcode_core/
mcp_client.rs

1//! MCP Client implementation
2//!
3//! This module provides a high-level abstraction over the rmcp library,
4//! managing MCP provider connections and tool execution.
5
6use crate::config::mcp::{
7    McpAllowListConfig, McpClientConfig, McpProviderConfig, McpTransportConfig,
8};
9use anyhow::{Context, Result};
10use async_trait::async_trait;
11use parking_lot::RwLock;
12use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
13use rmcp::{
14    ServiceExt,
15    handler::client::ClientHandler,
16    model::{
17        CallToolRequestParam, CallToolResult, ClientCapabilities, ClientInfo, Implementation,
18        ListToolsResult, LoggingLevel, LoggingMessageNotificationParam, RootsCapabilities,
19    },
20    transport::TokioChildProcess,
21};
22use serde_json::{Map, Value};
23use std::collections::HashMap;
24use std::future;
25use std::sync::Arc;
26use tokio::process::Command;
27use tokio::sync::Mutex;
28use tracing::{Level, debug, error, info, warn};
29
30#[derive(Clone)]
31struct LoggingClientHandler {
32    provider_name: String,
33    info: ClientInfo,
34}
35
36impl LoggingClientHandler {
37    fn new(provider_name: &str) -> Self {
38        let mut info = ClientInfo::default();
39        info.capabilities = ClientCapabilities {
40            roots: Some(RootsCapabilities {
41                list_changed: Some(true),
42            }),
43            ..ClientCapabilities::default()
44        };
45        info.client_info = Implementation {
46            name: "vtcode".to_string(),
47            title: Some("VT Code MCP client".to_string()),
48            version: env!("CARGO_PKG_VERSION").to_string(),
49            icons: None,
50            website_url: Some("https://github.com/modelcontextprotocol".to_string()),
51        };
52
53        Self {
54            provider_name: provider_name.to_string(),
55            info,
56        }
57    }
58
59    fn handle_logging(&self, params: LoggingMessageNotificationParam) {
60        let payload = params.data;
61        let summary = payload
62            .get("message")
63            .and_then(Value::as_str)
64            .map(str::to_owned)
65            .unwrap_or_else(|| payload.to_string());
66
67        match params.level {
68            LoggingLevel::Debug => debug!(
69                provider = self.provider_name.as_str(),
70                summary = %summary,
71                payload = ?payload,
72                "MCP provider log"
73            ),
74            LoggingLevel::Info | LoggingLevel::Notice => info!(
75                provider = self.provider_name.as_str(),
76                summary = %summary,
77                payload = ?payload,
78                "MCP provider log"
79            ),
80            LoggingLevel::Warning => warn!(
81                provider = self.provider_name.as_str(),
82                summary = %summary,
83                payload = ?payload,
84                "MCP provider warning"
85            ),
86            LoggingLevel::Error
87            | LoggingLevel::Critical
88            | LoggingLevel::Alert
89            | LoggingLevel::Emergency => error!(
90                provider = self.provider_name.as_str(),
91                summary = %summary,
92                payload = ?payload,
93                "MCP provider error"
94            ),
95        }
96    }
97}
98
99impl ClientHandler for LoggingClientHandler {
100    fn on_logging_message(
101        &self,
102        params: LoggingMessageNotificationParam,
103        _context: rmcp::service::NotificationContext<rmcp::service::RoleClient>,
104    ) -> impl std::future::Future<Output = ()> + Send + '_ {
105        self.handle_logging(params);
106        future::ready(())
107    }
108
109    fn get_info(&self) -> ClientInfo {
110        self.info.clone()
111    }
112}
113
114/// High-level MCP client that manages multiple providers
115pub struct McpClient {
116    config: McpClientConfig,
117    pub providers: HashMap<String, Arc<McpProvider>>,
118    active_connections: Arc<Mutex<HashMap<String, Arc<RunningMcpService>>>>,
119    allowlist: Arc<RwLock<McpAllowListConfig>>,
120    tool_provider_index: Arc<RwLock<HashMap<String, String>>>,
121}
122
123impl McpClient {
124    /// Create a new MCP client with the given configuration
125    pub fn new(config: McpClientConfig) -> Self {
126        let allowlist = Arc::new(RwLock::new(config.allowlist.clone()));
127        Self {
128            config,
129            providers: HashMap::new(),
130            active_connections: Arc::new(Mutex::new(HashMap::new())),
131            allowlist,
132            tool_provider_index: Arc::new(RwLock::new(HashMap::new())),
133        }
134    }
135
136    fn record_tool_provider(&self, provider: &str, tool: &str) {
137        debug!("Recording tool '{}' -> provider '{}'", tool, provider);
138        self.tool_provider_index
139            .write()
140            .insert(tool.to_string(), provider.to_string());
141    }
142
143    /// Retrieve provider reference for a known tool name
144    pub fn provider_for_tool(&self, tool_name: &str) -> Option<String> {
145        let index = self.tool_provider_index.read();
146        if let Some(provider) = index.get(tool_name) {
147            // Validate that the provider still exists and is enabled
148            if self.providers.contains_key(provider) {
149                debug!("Found tool '{}' in provider '{}'", tool_name, provider);
150                Some(provider.clone())
151            } else {
152                debug!(
153                    "Tool '{}' references non-existent provider '{}'",
154                    tool_name, provider
155                );
156                None
157            }
158        } else {
159            debug!("Tool '{}' not found in provider index", tool_name);
160            None
161        }
162    }
163
164    /// Replace the in-memory MCP allow list with the provided configuration
165    pub fn update_allowlist(&self, allowlist: McpAllowListConfig) {
166        *self.allowlist.write() = allowlist;
167    }
168
169    /// Get a clone of the current allow list
170    pub fn current_allowlist(&self) -> McpAllowListConfig {
171        self.allowlist.read().clone()
172    }
173
174    fn format_tool_result(
175        provider_name: &str,
176        tool_name: &str,
177        result: CallToolResult,
178    ) -> Result<Value> {
179        let is_error = result.is_error.unwrap_or(false);
180        let text_summary = result
181            .content
182            .iter()
183            .find_map(|content| content.as_text().map(|text| text.text.clone()));
184
185        if is_error {
186            let detail = result
187                .structured_content
188                .as_ref()
189                .and_then(|value| value.get("message").and_then(Value::as_str))
190                .map(str::to_owned)
191                .or_else(|| {
192                    result
193                        .structured_content
194                        .as_ref()
195                        .map(|value| value.to_string())
196                })
197                .or(text_summary)
198                .unwrap_or_else(|| "Unknown MCP tool error".to_string());
199
200            return Err(anyhow::anyhow!(
201                "MCP tool '{}' on provider '{}' reported an error: {}",
202                tool_name,
203                provider_name,
204                detail
205            ));
206        }
207
208        let mut payload = Map::new();
209        payload.insert("provider".into(), Value::String(provider_name.to_string()));
210        payload.insert("tool".into(), Value::String(tool_name.to_string()));
211
212        if let Some(meta) = result.meta {
213            if let Ok(meta_value) = serde_json::to_value(&meta) {
214                if !meta_value.is_null() {
215                    payload.insert("meta".into(), meta_value);
216                }
217            }
218        }
219
220        if let Some(structured) = result.structured_content {
221            match structured {
222                Value::Object(mut object) => {
223                    object
224                        .entry("provider")
225                        .or_insert_with(|| Value::String(provider_name.to_string()));
226                    object
227                        .entry("tool")
228                        .or_insert_with(|| Value::String(tool_name.to_string()));
229
230                    if let Some(meta_value) = payload.remove("meta") {
231                        object.entry("meta").or_insert(meta_value);
232                    }
233
234                    return Ok(Value::Object(object));
235                }
236                other => {
237                    payload.insert("structured_content".into(), other);
238                }
239            }
240        }
241
242        if let Some(summary) = text_summary {
243            payload.insert("message".into(), Value::String(summary));
244        }
245
246        if !result.content.is_empty() {
247            if let Ok(content_value) = serde_json::to_value(&result.content) {
248                payload.insert("content".into(), content_value);
249            }
250        }
251
252        Ok(Value::Object(payload))
253    }
254
255    /// Initialize the MCP client and connect to configured providers
256    pub async fn initialize(&mut self) -> Result<()> {
257        if !self.config.enabled {
258            info!("MCP client is disabled in configuration");
259            return Ok(());
260        }
261
262        info!(
263            "Initializing MCP client with {} configured providers",
264            self.config.providers.len()
265        );
266
267        for provider_config in &self.config.providers {
268            if provider_config.enabled {
269                info!("Initializing MCP provider '{}'", provider_config.name);
270
271                match McpProvider::new(provider_config.clone()).await {
272                    Ok(provider) => {
273                        let provider = Arc::new(provider);
274                        self.providers
275                            .insert(provider_config.name.clone(), provider);
276                        info!(
277                            "Successfully initialized MCP provider '{}'",
278                            provider_config.name
279                        );
280                        self.audit_log(
281                            Some(provider_config.name.as_str()),
282                            "mcp.provider_initialized",
283                            Level::INFO,
284                            format!("Provider '{}' initialized", provider_config.name),
285                        );
286                    }
287                    Err(e) => {
288                        error!(
289                            "Failed to initialize MCP provider '{}': {}",
290                            provider_config.name, e
291                        );
292                        self.audit_log(
293                            Some(provider_config.name.as_str()),
294                            "mcp.provider_initialization_failed",
295                            Level::WARN,
296                            format!(
297                                "Failed to initialize provider '{}' due to error: {}",
298                                provider_config.name, e
299                            ),
300                        );
301                        // Continue with other providers instead of failing completely
302                        continue;
303                    }
304                }
305            } else {
306                debug!(
307                    "MCP provider '{}' is disabled, skipping",
308                    provider_config.name
309                );
310            }
311        }
312
313        info!(
314            "MCP client initialization complete. Active providers: {}",
315            self.providers.len()
316        );
317
318        // Clean up any providers with terminated processes
319        let _ = self.cleanup_dead_providers().await;
320
321        Ok(())
322    }
323
324    /// Kill any remaining MCP provider processes that may not have terminated properly
325    async fn kill_remaining_mcp_processes(&self) {
326        debug!("Checking for remaining MCP provider processes to clean up");
327
328        // Try to find and kill any remaining MCP provider processes
329        // This is a fallback for cases where the rmcp library doesn't properly terminate processes
330        let process_cleanup_attempts = tokio::time::timeout(
331            tokio::time::Duration::from_secs(5),
332            self.attempt_process_cleanup(),
333        )
334        .await;
335
336        match process_cleanup_attempts {
337            Ok(Ok(cleaned_count)) => {
338                if cleaned_count > 0 {
339                    info!(
340                        "Cleaned up {} remaining MCP provider processes",
341                        cleaned_count
342                    );
343                    self.audit_log(
344                        None,
345                        "mcp.process_cleanup",
346                        Level::INFO,
347                        format!(
348                            "Cleaned up {} remaining MCP provider processes",
349                            cleaned_count
350                        ),
351                    );
352                } else {
353                    debug!("No remaining MCP provider processes to clean up");
354                }
355            }
356            Ok(Err(e)) => {
357                warn!("Error during MCP process cleanup (non-critical): {}", e);
358                self.audit_log(
359                    None,
360                    "mcp.process_cleanup_error",
361                    Level::WARN,
362                    format!("Error during MCP process cleanup: {}", e),
363                );
364            }
365            Err(_) => {
366                warn!("MCP process cleanup timed out (non-critical)");
367                self.audit_log(
368                    None,
369                    "mcp.process_cleanup_timeout",
370                    Level::WARN,
371                    "MCP process cleanup timed out".to_string(),
372                );
373            }
374        }
375    }
376
377    /// Attempt to clean up MCP provider processes by finding and killing them
378    async fn attempt_process_cleanup(&self) -> Result<usize> {
379        use tokio::process::Command as TokioCommand;
380
381        let mut cleaned_count = 0;
382
383        // Get current process ID to avoid killing ourselves
384        let current_pid = std::process::id();
385
386        // Try to find MCP provider processes and kill them
387        // This is a best-effort cleanup for processes that may have escaped proper termination
388        for provider_config in &self.config.providers {
389            if !provider_config.enabled {
390                continue;
391            }
392
393            let provider_name = &provider_config.name;
394            debug!("Attempting cleanup for MCP provider '{}'", provider_name);
395
396            // Try multiple approaches to find and kill processes
397            let mut provider_cleaned = 0;
398
399            // Approach 1: Use pgrep with command pattern
400            if let Ok(output) = TokioCommand::new("pgrep")
401                .args(["-f", &format!("mcp-server-{}", provider_name)])
402                .output()
403                .await
404            {
405                if output.status.success() {
406                    let pids = String::from_utf8_lossy(&output.stdout);
407                    for pid_str in pids.lines() {
408                        if let Ok(pid) = pid_str.trim().parse::<u32>() {
409                            if pid != current_pid && pid > 0 {
410                                if self.kill_process_gracefully(pid).await {
411                                    provider_cleaned += 1;
412                                }
413                            }
414                        }
415                    }
416                }
417            }
418
419            // Approach 2: If pgrep failed, try ps with grep
420            if provider_cleaned == 0 {
421                if let Ok(output) = TokioCommand::new("ps").args(["aux"]).output().await {
422                    if output.status.success() {
423                        let processes = String::from_utf8_lossy(&output.stdout);
424                        for line in processes.lines() {
425                            // Look for lines containing the provider name and MCP-related terms
426                            if line.contains(provider_name)
427                                && (line.contains("mcp")
428                                    || line.contains("node")
429                                    || line.contains("python"))
430                            {
431                                // Extract PID from ps output (first column)
432                                let parts: Vec<&str> = line.split_whitespace().collect();
433                                if let Some(pid_str) = parts.first() {
434                                    if let Ok(pid) = pid_str.parse::<u32>() {
435                                        if pid != current_pid && pid > 0 {
436                                            if self.kill_process_gracefully(pid).await {
437                                                provider_cleaned += 1;
438                                            }
439                                        }
440                                    }
441                                }
442                            }
443                        }
444                    }
445                }
446            }
447
448            if provider_cleaned > 0 {
449                debug!(
450                    "Cleaned up {} processes for MCP provider '{}'",
451                    provider_cleaned, provider_name
452                );
453                cleaned_count += provider_cleaned;
454                // Clear the tool provider index when we kill processes
455                self.tool_provider_index.write().clear();
456            }
457        }
458
459        Ok(cleaned_count)
460    }
461
462    /// Kill a process gracefully with TERM first, then KILL if needed
463    async fn kill_process_gracefully(&self, pid: u32) -> bool {
464        debug!("Killing process {} gracefully", pid);
465
466        // Try graceful termination first
467        let _ = tokio::process::Command::new("kill")
468            .args(["-TERM", &pid.to_string()])
469            .output()
470            .await;
471
472        // Give it a moment to terminate gracefully
473        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
474
475        // Check if process is still running
476        if let Ok(output) = tokio::process::Command::new("kill")
477            .args(["-0", &pid.to_string()]) // Check if process exists
478            .output()
479            .await
480        {
481            if output.status.success() {
482                // Process still exists, force kill it
483                debug!("Process {} still running, force killing", pid);
484                let _ = tokio::process::Command::new("kill")
485                    .args(["-KILL", &pid.to_string()])
486                    .output()
487                    .await;
488                true
489            } else {
490                // Process already terminated
491                debug!("Process {} already terminated", pid);
492                true
493            }
494        } else {
495            // kill -0 command failed, assume process doesn't exist
496            debug!("Process {} check failed, assuming terminated", pid);
497            true
498        }
499    }
500
501    /// Clean up providers with terminated processes
502    pub async fn cleanup_dead_providers(&self) -> Result<()> {
503        let mut dead_providers = Vec::new();
504
505        for (provider_name, provider) in &self.providers {
506            // Try to check if provider is still alive by attempting a quick operation
507            let provider_health_check = tokio::time::timeout(
508                tokio::time::Duration::from_secs(2),
509                provider.has_tool("ping"),
510            )
511            .await;
512
513            match provider_health_check {
514                Ok(Ok(_)) => {
515                    // Provider is responsive
516                    debug!("MCP provider '{}' is healthy", provider_name);
517                }
518                Ok(Err(e)) => {
519                    let error_msg = e.to_string();
520                    if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
521                        warn!(
522                            "MCP provider '{}' has terminated process, marking for cleanup",
523                            provider_name
524                        );
525                        dead_providers.push(provider_name.clone());
526                    } else {
527                        debug!(
528                            "MCP provider '{}' returned error but process may be alive: {}",
529                            provider_name, e
530                        );
531                    }
532                }
533                Err(_timeout) => {
534                    warn!(
535                        "MCP provider '{}' health check timed out, may be unresponsive",
536                        provider_name
537                    );
538                    // Don't mark as dead on timeout, might just be slow
539                }
540            }
541        }
542
543        // Note: In a real implementation, we'd want to remove dead providers from the providers map
544        // For now, we'll just log them
545        if !dead_providers.is_empty() {
546            warn!(
547                "Found {} dead MCP providers: {:?}",
548                dead_providers.len(),
549                dead_providers
550            );
551        }
552
553        Ok(())
554    }
555
556    /// List all available MCP tools across all providers
557    pub async fn list_tools(&self) -> Result<Vec<McpToolInfo>> {
558        if !self.config.enabled {
559            debug!("MCP client is disabled, returning empty tool list");
560            return Ok(Vec::new());
561        }
562
563        if self.providers.is_empty() {
564            debug!("No MCP providers configured, returning empty tool list");
565            return Ok(Vec::new());
566        }
567
568        let mut all_tools = Vec::new();
569        let mut errors = Vec::new();
570
571        let allowlist_snapshot = self.allowlist.read().clone();
572
573        for (provider_name, provider) in &self.providers {
574            let provider_id = provider_name.as_str();
575            match tokio::time::timeout(tokio::time::Duration::from_secs(15), provider.list_tools())
576                .await
577            {
578                Ok(Ok(tools)) => {
579                    debug!(
580                        "Provider '{}' has {} tools",
581                        provider_name,
582                        tools.tools.len()
583                    );
584
585                    for tool in tools.tools {
586                        let tool_name = tool.name.as_ref();
587
588                        if allowlist_snapshot.is_tool_allowed(provider_id, tool_name) {
589                            self.record_tool_provider(provider_id, tool_name);
590                            all_tools.push(McpToolInfo {
591                                name: tool_name.to_string(),
592                                description: tool.description.unwrap_or_default().to_string(),
593                                provider: provider_name.clone(),
594                                input_schema: serde_json::to_value(&*tool.input_schema)
595                                    .unwrap_or(Value::Null),
596                            });
597                        } else {
598                            self.audit_log(
599                                Some(provider_id),
600                                "mcp.tool_filtered",
601                                Level::DEBUG,
602                                format!(
603                                    "Filtered tool '{}' from provider '{}' due to allow list",
604                                    tool_name, provider_id
605                                ),
606                            );
607                        }
608                    }
609                }
610                Ok(Err(e)) => {
611                    let error_msg = e.to_string();
612                    if error_msg.contains("No such process")
613                        || error_msg.contains("ESRCH")
614                        || error_msg.contains("EPIPE")
615                        || error_msg.contains("Broken pipe")
616                        || error_msg.contains("write EPIPE")
617                    {
618                        debug!(
619                            "MCP provider '{}' process/pipe terminated during tool listing (normal during shutdown): {}",
620                            provider_name, e
621                        );
622                    } else {
623                        warn!(
624                            "Failed to list tools for provider '{}': {}",
625                            provider_name, e
626                        );
627                    }
628                    let error_msg = format!(
629                        "Failed to list tools for provider '{}': {}",
630                        provider_name, e
631                    );
632                    errors.push(error_msg);
633                }
634                Err(_timeout) => {
635                    warn!("MCP provider '{}' tool listing timed out", provider_name);
636                    let error_msg =
637                        format!("Tool listing timeout for provider '{}'", provider_name);
638                    errors.push(error_msg);
639                }
640            }
641        }
642
643        if !errors.is_empty() {
644            warn!(
645                "Encountered {} errors while listing MCP tools: {:?}",
646                errors.len(),
647                errors
648            );
649        }
650
651        info!(
652            "Found {} total MCP tools across all providers",
653            all_tools.len()
654        );
655        Ok(all_tools)
656    }
657
658    /// Execute a tool call on the appropriate MCP provider
659    pub async fn execute_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
660        if !self.config.enabled {
661            return Err(anyhow::anyhow!("MCP client is disabled"));
662        }
663
664        if self.providers.is_empty() {
665            return Err(anyhow::anyhow!("No MCP providers configured"));
666        }
667
668        let tool_name_owned = tool_name.to_string();
669        debug!("Executing MCP tool '{}' with args: {}", tool_name, args);
670
671        // Find the provider that has this tool
672        let provider_name = {
673            let mut found_provider = None;
674            let mut provider_errors = Vec::new();
675
676            for (name, provider) in &self.providers {
677                match provider.has_tool(&tool_name_owned).await {
678                    Ok(true) => {
679                        found_provider = Some(name.clone());
680                        break;
681                    }
682                    Ok(false) => continue,
683                    Err(e) => {
684                        let error_msg = format!(
685                            "Error checking tool availability for provider '{}': {}",
686                            name, e
687                        );
688                        warn!("{}", error_msg);
689                        provider_errors.push(error_msg);
690                    }
691                }
692            }
693
694            found_provider.ok_or_else(|| {
695                let error_msg = format!(
696                    "Tool '{}' not found in any MCP provider. Provider errors: {:?}",
697                    tool_name, provider_errors
698                );
699                anyhow::anyhow!(error_msg)
700            })?
701        };
702
703        debug!("Found tool '{}' in provider '{}'", tool_name, provider_name);
704
705        if !self
706            .allowlist
707            .read()
708            .is_tool_allowed(provider_name.as_str(), tool_name)
709        {
710            let message = format!(
711                "Tool '{}' from provider '{}' is not permitted by the MCP allow list",
712                tool_name, provider_name
713            );
714            self.audit_log(
715                Some(provider_name.as_str()),
716                "mcp.tool_denied",
717                Level::WARN,
718                message.as_str(),
719            );
720            return Err(anyhow::anyhow!(message));
721        }
722
723        self.record_tool_provider(provider_name.as_str(), tool_name);
724
725        let provider = self.providers.get(&provider_name).ok_or_else(|| {
726            anyhow::anyhow!("Provider '{}' not found after discovery", provider_name)
727        })?;
728
729        // Get or create connection for this provider
730        let connection = match self.get_or_create_connection(provider).await {
731            Ok(conn) => conn,
732            Err(e) => {
733                error!(
734                    "Failed to establish connection to provider '{}': {}",
735                    provider_name, e
736                );
737                return Err(e);
738            }
739        };
740
741        // Execute the tool call
742        match connection
743            .call_tool(CallToolRequestParam {
744                name: tool_name_owned.into(),
745                arguments: args.as_object().cloned(),
746            })
747            .await
748        {
749            Ok(result) => match Self::format_tool_result(provider_name.as_str(), tool_name, result)
750            {
751                Ok(serialized) => {
752                    info!(
753                        "Successfully executed MCP tool '{}' via provider '{}'",
754                        tool_name, provider_name
755                    );
756                    self.audit_log(
757                        Some(provider_name.as_str()),
758                        "mcp.tool_execution",
759                        Level::INFO,
760                        format!(
761                            "Successfully executed MCP tool '{}' via provider '{}'",
762                            tool_name, provider_name
763                        ),
764                    );
765                    Ok(serialized)
766                }
767                Err(err) => {
768                    let err_message = err.to_string();
769                    warn!(
770                        "MCP tool '{}' via provider '{}' returned an error payload: {}",
771                        tool_name, provider_name, err_message
772                    );
773                    self.audit_log(
774                        Some(provider_name.as_str()),
775                        "mcp.tool_failed",
776                        Level::WARN,
777                        format!(
778                            "MCP tool '{}' via provider '{}' returned an error payload: {}",
779                            tool_name, provider_name, err_message
780                        ),
781                    );
782                    Err(err)
783                }
784            },
785            Err(e) => {
786                let error_message = e.to_string();
787
788                error!(
789                    "MCP tool '{}' failed on provider '{}': {}",
790                    tool_name, provider_name, error_message
791                );
792                self.audit_log(
793                    Some(provider_name.as_str()),
794                    "mcp.tool_failed",
795                    Level::WARN,
796                    format!(
797                        "MCP tool '{}' failed on provider '{}': {}",
798                        tool_name, provider_name, error_message
799                    ),
800                );
801
802                // Handle different types of connection errors
803                if error_message.contains("EPIPE")
804                    || error_message.contains("Broken pipe")
805                    || error_message.contains("write EPIPE")
806                    || error_message.contains("No such process")
807                    || error_message.contains("ESRCH")
808                {
809                    // Drop the stale connection so a fresh process can be created next time
810                    let mut connections = self.active_connections.lock().await;
811                    connections.remove(&provider_name);
812                    // Remove cached tool-provider mapping so it is refreshed on reconnect
813                    self.tool_provider_index
814                        .write()
815                        .retain(|_, provider| provider != &provider_name);
816
817                    return Err(anyhow::anyhow!(
818                        "MCP provider '{}' disconnected unexpectedly while executing '{}'. The provider process may have terminated. Try re-running the command to restart the provider.",
819                        provider_name,
820                        tool_name
821                    ));
822                } else if error_message.contains("timeout") || error_message.contains("Timeout") {
823                    // Drop the stale connection on timeout
824                    let mut connections = self.active_connections.lock().await;
825                    connections.remove(&provider_name);
826
827                    return Err(anyhow::anyhow!(
828                        "MCP tool '{}' execution timed out on provider '{}'. The provider may be unresponsive. Try re-running the command.",
829                        tool_name,
830                        provider_name
831                    ));
832                } else if error_message.contains("permission")
833                    || error_message.contains("Permission denied")
834                {
835                    return Err(anyhow::anyhow!(
836                        "Permission denied executing MCP tool '{}' on provider '{}': {}",
837                        tool_name,
838                        provider_name,
839                        error_message
840                    ));
841                } else if error_message.contains("network")
842                    || error_message.contains("Connection refused")
843                {
844                    return Err(anyhow::anyhow!(
845                        "Network error executing MCP tool '{}' on provider '{}': {}",
846                        tool_name,
847                        provider_name,
848                        error_message
849                    ));
850                }
851
852                Err(anyhow::anyhow!(
853                    "MCP tool execution failed: {}",
854                    error_message
855                ))
856            }
857        }
858    }
859
860    /// Get or create a connection to the specified provider
861    async fn get_or_create_connection(
862        &self,
863        provider: &McpProvider,
864    ) -> Result<Arc<RunningMcpService>> {
865        let provider_name = &provider.config.name;
866        debug!("Getting connection for MCP provider '{}'", provider_name);
867
868        let mut connections = self.active_connections.lock().await;
869
870        if !connections.contains_key(provider_name) {
871            debug!("Creating new connection for provider '{}'", provider_name);
872
873            // Add timeout for connection creation
874            match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
875                .await
876            {
877                Ok(Ok(connection)) => {
878                    let connection = Arc::new(connection);
879                    connections.insert(provider_name.clone(), Arc::clone(&connection));
880                    debug!(
881                        "Successfully created connection for provider '{}'",
882                        provider_name
883                    );
884                    Ok(connection)
885                }
886                Ok(Err(e)) => {
887                    let error_msg = e.to_string();
888                    if error_msg.contains("HTTP MCP server support") {
889                        warn!(
890                            "Provider '{}' uses HTTP transport which is not fully implemented: {}",
891                            provider_name, e
892                        );
893                        Err(anyhow::anyhow!(
894                            "HTTP MCP transport not fully implemented for provider '{}'. Consider using stdio transport instead.",
895                            provider_name
896                        ))
897                    } else if error_msg.contains("command not found")
898                        || error_msg.contains("No such file")
899                    {
900                        error!("Command not found for provider '{}': {}", provider_name, e);
901                        Err(anyhow::anyhow!(
902                            "Command '{}' not found for MCP provider '{}'. Please ensure the MCP server is installed and accessible.",
903                            self.config
904                                .providers
905                                .iter()
906                                .find(|p| p.name == *provider_name)
907                                .map(|p| match &p.transport {
908                                    McpTransportConfig::Stdio(stdio) => stdio.command.as_str(),
909                                    _ => "unknown",
910                                })
911                                .unwrap_or("unknown"),
912                            provider_name
913                        ))
914                    } else if error_msg.contains("permission")
915                        || error_msg.contains("Permission denied")
916                    {
917                        error!(
918                            "Permission denied creating connection for provider '{}': {}",
919                            provider_name, e
920                        );
921                        Err(anyhow::anyhow!(
922                            "Permission denied executing MCP server for provider '{}': {}",
923                            provider_name,
924                            error_msg
925                        ))
926                    } else {
927                        error!(
928                            "Failed to create connection for provider '{}': {}",
929                            provider_name, e
930                        );
931                        Err(anyhow::anyhow!(
932                            "Failed to create connection for MCP provider '{}': {}",
933                            provider_name,
934                            error_msg
935                        ))
936                    }
937                }
938                Err(_timeout) => {
939                    error!(
940                        "Connection creation timed out for provider '{}' after {} seconds",
941                        provider_name, 30
942                    );
943                    Err(anyhow::anyhow!(
944                        "Connection creation timed out for MCP provider '{}' after {} seconds. The provider may be slow to start or unresponsive.",
945                        provider_name,
946                        30
947                    ))
948                }
949            }
950        } else {
951            // Validate existing connection is still healthy
952            let existing_connection = connections.get(provider_name).unwrap().clone();
953
954            // Quick health check - try to use the connection
955            if let Err(e) = self
956                .validate_connection(provider_name, &existing_connection)
957                .await
958            {
959                debug!(
960                    "Existing connection for provider '{}' is unhealthy, creating new one: {}",
961                    provider_name, e
962                );
963
964                // Remove the unhealthy connection
965                connections.remove(provider_name);
966
967                // Create new connection
968                match tokio::time::timeout(tokio::time::Duration::from_secs(30), provider.connect())
969                    .await
970                {
971                    Ok(Ok(connection)) => {
972                        let connection = Arc::new(connection);
973                        connections.insert(provider_name.clone(), Arc::clone(&connection));
974                        debug!(
975                            "Successfully created new connection for provider '{}'",
976                            provider_name
977                        );
978                        Ok(connection)
979                    }
980                    Ok(Err(e)) => {
981                        error!(
982                            "Failed to create replacement connection for provider '{}': {}",
983                            provider_name, e
984                        );
985                        Err(e)
986                    }
987                    Err(_timeout) => {
988                        error!(
989                            "Replacement connection creation timed out for provider '{}'",
990                            provider_name
991                        );
992                        Err(anyhow::anyhow!(
993                            "Replacement connection timeout for provider '{}'",
994                            provider_name
995                        ))
996                    }
997                }
998            } else {
999                debug!(
1000                    "Reusing existing healthy connection for provider '{}'",
1001                    provider_name
1002                );
1003                Ok(existing_connection)
1004            }
1005        }
1006    }
1007
1008    /// Validate that an existing connection is still healthy
1009    async fn validate_connection(
1010        &self,
1011        provider_name: &str,
1012        connection: &RunningMcpService,
1013    ) -> Result<()> {
1014        debug!(
1015            "Validating connection health for provider '{}'",
1016            provider_name
1017        );
1018
1019        // Try to ping the connection with a simple tool check
1020        // Use a very short timeout to avoid blocking
1021        match tokio::time::timeout(
1022            tokio::time::Duration::from_secs(2),
1023            connection.list_tools(Default::default()),
1024        )
1025        .await
1026        {
1027            Ok(Ok(_)) => {
1028                debug!(
1029                    "Connection health check passed for provider '{}'",
1030                    provider_name
1031                );
1032                Ok(())
1033            }
1034            Ok(Err(e)) => {
1035                let error_msg = e.to_string();
1036                debug!(
1037                    "Connection health check failed for provider '{}': {}",
1038                    provider_name, error_msg
1039                );
1040                Err(anyhow::anyhow!(
1041                    "Connection health check failed for provider '{}': {}",
1042                    provider_name,
1043                    error_msg
1044                ))
1045            }
1046            Err(_) => {
1047                debug!(
1048                    "Connection health check timed out for provider '{}'",
1049                    provider_name
1050                );
1051                Err(anyhow::anyhow!(
1052                    "Connection health check timed out for provider '{}'",
1053                    provider_name
1054                ))
1055            }
1056        }
1057    }
1058
1059    fn audit_log(
1060        &self,
1061        provider: Option<&str>,
1062        channel: &str,
1063        level: Level,
1064        message: impl AsRef<str>,
1065    ) {
1066        let logging_allowed = {
1067            let allowlist = self.allowlist.read();
1068            allowlist.is_logging_channel_allowed(provider, channel)
1069        };
1070
1071        if !logging_allowed {
1072            return;
1073        }
1074
1075        let msg = message.as_ref();
1076        match level {
1077            Level::ERROR => error!(target: "mcp", "[{}] {}", channel, msg),
1078            Level::WARN => warn!(target: "mcp", "[{}] {}", channel, msg),
1079            Level::INFO => info!(target: "mcp", "[{}] {}", channel, msg),
1080            Level::DEBUG => debug!(target: "mcp", "[{}] {}", channel, msg),
1081            _ => debug!(target: "mcp", "[{}] {}", channel, msg),
1082        }
1083    }
1084
1085    /// Shutdown all MCP connections
1086    pub async fn shutdown(&self) -> Result<()> {
1087        info!("Shutting down MCP client and all provider connections");
1088
1089        let mut connections = self.active_connections.lock().await;
1090
1091        if connections.is_empty() {
1092            info!("No active MCP connections to shutdown");
1093            return Ok(());
1094        }
1095
1096        info!(
1097            "Shutting down {} MCP provider connections",
1098            connections.len()
1099        );
1100
1101        let cancellation_tokens: Vec<(String, rmcp::service::RunningServiceCancellationToken)> =
1102            connections
1103                .iter()
1104                .map(|(provider_name, connection)| {
1105                    debug!(
1106                        "Initiating graceful shutdown for MCP provider '{}'",
1107                        provider_name
1108                    );
1109                    (provider_name.clone(), connection.cancellation_token())
1110                })
1111                .collect();
1112
1113        for (provider_name, token) in cancellation_tokens {
1114            debug!(
1115                "Cancelling MCP provider '{}' via cancellation token",
1116                provider_name
1117            );
1118            token.cancel();
1119        }
1120
1121        // Give connections a grace period to shutdown cleanly
1122        let shutdown_timeout = tokio::time::Duration::from_secs(5);
1123        let shutdown_start = std::time::Instant::now();
1124
1125        // Wait for graceful shutdown or timeout
1126        while shutdown_start.elapsed() < shutdown_timeout && !connections.is_empty() {
1127            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1128
1129            // Remove any connections that have been dropped
1130            connections.retain(|_, connection| {
1131                // Check if the connection is still valid
1132                Arc::strong_count(connection) > 1 // At least our reference and possibly others
1133            });
1134        }
1135
1136        // Force shutdown any remaining connections
1137        let remaining_count = connections.len();
1138        if remaining_count > 0 {
1139            warn!(
1140                "{} MCP provider connections did not shutdown gracefully within timeout, forcing shutdown",
1141                remaining_count
1142            );
1143        }
1144
1145        // Clear all connections (this will drop them and should terminate processes)
1146        let drained_connections: Vec<_> = connections.drain().collect();
1147        drop(connections);
1148
1149        for (provider_name, connection) in drained_connections {
1150            debug!("Force shutting down MCP provider '{}'", provider_name);
1151
1152            if let Ok(connection) = Arc::try_unwrap(connection) {
1153                debug!(
1154                    "Awaiting MCP provider '{}' task cancellation after graceful request",
1155                    provider_name
1156                );
1157
1158                match connection.cancel().await {
1159                    Ok(quit_reason) => {
1160                        debug!(
1161                            "MCP provider '{}' cancellation completed with reason: {:?}",
1162                            provider_name, quit_reason
1163                        );
1164                    }
1165                    Err(err) => {
1166                        debug!(
1167                            "MCP provider '{}' cancellation join error (non-critical): {}",
1168                            provider_name, err
1169                        );
1170                    }
1171                }
1172            } else {
1173                debug!(
1174                    "Additional references exist for MCP provider '{}'; dropping without awaiting",
1175                    provider_name
1176                );
1177            }
1178        }
1179
1180        // Give processes time to terminate gracefully
1181        tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
1182
1183        // Additional cleanup: try to kill any remaining MCP provider processes
1184        // This handles cases where the rmcp library doesn't properly terminate processes
1185        self.kill_remaining_mcp_processes().await;
1186
1187        info!("MCP client shutdown complete");
1188        Ok(())
1189    }
1190}
1191
1192/// Information about an MCP tool
1193#[derive(Debug, Clone)]
1194pub struct McpToolInfo {
1195    pub name: String,
1196    pub description: String,
1197    pub provider: String,
1198    pub input_schema: Value,
1199}
1200
1201/// Individual MCP provider wrapper
1202pub struct McpProvider {
1203    config: McpProviderConfig,
1204    tools_cache: Arc<Mutex<Option<ListToolsResult>>>,
1205}
1206
1207impl McpProvider {
1208    /// Create a new MCP provider
1209    pub async fn new(config: McpProviderConfig) -> Result<Self> {
1210        Ok(Self {
1211            config,
1212            tools_cache: Arc::new(Mutex::new(None)),
1213        })
1214    }
1215
1216    /// List tools available from this provider
1217    pub async fn list_tools(&self) -> Result<ListToolsResult> {
1218        let provider_name = &self.config.name;
1219        debug!("Listing tools for MCP provider '{}'", provider_name);
1220
1221        // Check cache first
1222        {
1223            let cache = self.tools_cache.lock().await;
1224            if let Some(cached) = cache.as_ref() {
1225                debug!("Using cached tools for provider '{}'", provider_name);
1226                return Ok(cached.clone());
1227            }
1228        }
1229
1230        debug!("Connecting to provider '{}' to fetch tools", provider_name);
1231
1232        // Connect and get tools
1233        match self.connect().await {
1234            Ok(connection) => {
1235                match connection.list_tools(Default::default()).await {
1236                    Ok(tools) => {
1237                        debug!(
1238                            "Found {} tools for provider '{}'",
1239                            tools.tools.len(),
1240                            provider_name
1241                        );
1242
1243                        // Cache the result
1244                        {
1245                            let mut cache = self.tools_cache.lock().await;
1246                            *cache = Some(tools.clone());
1247                        }
1248
1249                        Ok(tools)
1250                    }
1251                    Err(e) => {
1252                        error!(
1253                            "Failed to list tools for provider '{}': {}",
1254                            provider_name, e
1255                        );
1256                        Err(anyhow::anyhow!("Failed to list tools: {}", e))
1257                    }
1258                }
1259            }
1260            Err(e) => {
1261                error!("Failed to connect to provider '{}': {}", provider_name, e);
1262                Err(e)
1263            }
1264        }
1265    }
1266
1267    /// Check if this provider has a specific tool
1268    pub async fn has_tool(&self, tool_name: &str) -> Result<bool> {
1269        let provider_name = &self.config.name;
1270        debug!(
1271            "Checking if provider '{}' has tool '{}'",
1272            provider_name, tool_name
1273        );
1274
1275        match tokio::time::timeout(tokio::time::Duration::from_secs(10), self.list_tools()).await {
1276            Ok(Ok(tools)) => {
1277                let has_tool = tools.tools.iter().any(|tool| tool.name == tool_name);
1278                debug!(
1279                    "Provider '{}' {} tool '{}'",
1280                    provider_name,
1281                    if has_tool { "has" } else { "does not have" },
1282                    tool_name
1283                );
1284                Ok(has_tool)
1285            }
1286            Ok(Err(e)) => {
1287                let error_msg = e.to_string();
1288                if error_msg.contains("No such process")
1289                    || error_msg.contains("ESRCH")
1290                    || error_msg.contains("EPIPE")
1291                    || error_msg.contains("Broken pipe")
1292                    || error_msg.contains("write EPIPE")
1293                {
1294                    debug!(
1295                        "MCP provider '{}' process/pipe terminated during tool check (normal during shutdown): {}",
1296                        provider_name, e
1297                    );
1298                } else {
1299                    warn!(
1300                        "Failed to check tool availability for provider '{}': {}",
1301                        provider_name, e
1302                    );
1303                }
1304                Err(e)
1305            }
1306            Err(_timeout) => {
1307                warn!("MCP provider '{}' tool check timed out", provider_name);
1308                Err(anyhow::anyhow!("Tool availability check timeout"))
1309            }
1310        }
1311    }
1312
1313    /// Connect to this MCP provider
1314    async fn connect(&self) -> Result<RunningMcpService> {
1315        let provider_name = &self.config.name;
1316        info!("Connecting to MCP provider '{}'", provider_name);
1317
1318        match &self.config.transport {
1319            McpTransportConfig::Stdio(stdio_config) => {
1320                debug!("Using stdio transport for provider '{}'", provider_name);
1321                self.connect_stdio(stdio_config).await
1322            }
1323            McpTransportConfig::Http(http_config) => {
1324                debug!("Using HTTP transport for provider '{}'", provider_name);
1325                self.connect_http(http_config).await
1326            }
1327        }
1328    }
1329
1330    /// Connect using HTTP transport
1331    async fn connect_http(
1332        &self,
1333        config: &crate::config::mcp::McpHttpServerConfig,
1334    ) -> Result<RunningMcpService> {
1335        let provider_name = &self.config.name;
1336        debug!(
1337            "Setting up HTTP connection for provider '{}'",
1338            provider_name
1339        );
1340
1341        // Build the HTTP client with proper headers
1342        let mut headers = HeaderMap::new();
1343        headers.insert("Content-Type", "application/json".parse().unwrap());
1344
1345        // Add API key if provided
1346        if let Some(api_key_env) = &config.api_key_env {
1347            if let Ok(api_key) = std::env::var(api_key_env) {
1348                headers.insert(
1349                    "Authorization",
1350                    format!("Bearer {}", api_key).parse().unwrap(),
1351                );
1352            } else {
1353                warn!(
1354                    "API key environment variable '{}' not found for provider '{}'",
1355                    api_key_env, provider_name
1356                );
1357            }
1358        }
1359
1360        // Add custom headers
1361        for (key, value) in &config.headers {
1362            if let (Ok(header_name), Ok(header_value)) =
1363                (key.parse::<HeaderName>(), value.parse::<HeaderValue>())
1364            {
1365                headers.insert(header_name, header_value);
1366            }
1367        }
1368
1369        let client = reqwest::Client::builder()
1370            .default_headers(headers)
1371            .timeout(std::time::Duration::from_secs(30))
1372            .build()
1373            .context("Failed to build HTTP client")?;
1374
1375        // Test basic connectivity first
1376        debug!(
1377            "Testing HTTP MCP server connectivity at '{}'",
1378            config.endpoint
1379        );
1380
1381        match client.get(&config.endpoint).send().await {
1382            Ok(response) => {
1383                let status = response.status();
1384                if status.is_success() {
1385                    debug!(
1386                        "HTTP MCP server at '{}' is reachable (status: {})",
1387                        config.endpoint, status
1388                    );
1389
1390                    // Check if the server supports MCP by looking for the MCP endpoint
1391                    // According to MCP spec, servers should expose tools at /mcp endpoint
1392                    let mcp_endpoint = if config.endpoint.ends_with('/') {
1393                        format!("{}mcp", config.endpoint)
1394                    } else {
1395                        format!("{}/mcp", config.endpoint)
1396                    };
1397
1398                    debug!("Attempting to connect to MCP endpoint: {}", mcp_endpoint);
1399
1400                    // Try to connect to the MCP endpoint
1401                    match client.get(&mcp_endpoint).send().await {
1402                        Ok(mcp_response) => {
1403                            if mcp_response.status().is_success() {
1404                                debug!(
1405                                    "MCP endpoint '{}' is available (status: {})",
1406                                    mcp_endpoint,
1407                                    mcp_response.status()
1408                                );
1409
1410                                // For now, return an error indicating this needs full streamable HTTP implementation
1411                                // A complete implementation would use Server-Sent Events (SSE) for streaming MCP
1412                                Err(anyhow::anyhow!(
1413                                    "HTTP MCP server detected at '{}' but full streamable HTTP implementation is required. \
1414                                     MCP endpoint is available at '{}'. \
1415                                     Consider using stdio transport or implement HTTP streaming support with Server-Sent Events.",
1416                                    config.endpoint,
1417                                    mcp_endpoint
1418                                ))
1419                            } else {
1420                                debug!(
1421                                    "MCP endpoint '{}' returned status: {}",
1422                                    mcp_endpoint,
1423                                    mcp_response.status()
1424                                );
1425                                Err(anyhow::anyhow!(
1426                                    "HTTP MCP server at '{}' does not support MCP protocol. \
1427                                     Expected MCP endpoint at '{}' but got status: {}. \
1428                                     Consider using stdio transport instead.",
1429                                    config.endpoint,
1430                                    mcp_endpoint,
1431                                    mcp_response.status()
1432                                ))
1433                            }
1434                        }
1435                        Err(e) => {
1436                            let error_msg = e.to_string();
1437                            debug!(
1438                                "Failed to connect to MCP endpoint '{}': {}",
1439                                mcp_endpoint, error_msg
1440                            );
1441
1442                            Err(anyhow::anyhow!(
1443                                "HTTP MCP server at '{}' does not properly support MCP protocol. \
1444                                 Could not connect to MCP endpoint at '{}': {}. \
1445                                 Consider using stdio transport instead.",
1446                                config.endpoint,
1447                                mcp_endpoint,
1448                                error_msg
1449                            ))
1450                        }
1451                    }
1452                } else {
1453                    Err(anyhow::anyhow!(
1454                        "HTTP MCP server returned error status: {} at endpoint: {}",
1455                        status,
1456                        config.endpoint
1457                    ))
1458                }
1459            }
1460            Err(e) => {
1461                let error_msg = e.to_string();
1462                if error_msg.contains("dns") || error_msg.contains("Name resolution") {
1463                    Err(anyhow::anyhow!(
1464                        "HTTP MCP server DNS resolution failed for '{}': {}",
1465                        config.endpoint,
1466                        e
1467                    ))
1468                } else if error_msg.contains("Connection refused") || error_msg.contains("connect")
1469                {
1470                    Err(anyhow::anyhow!(
1471                        "HTTP MCP server connection failed for '{}': {}",
1472                        config.endpoint,
1473                        e
1474                    ))
1475                } else {
1476                    Err(anyhow::anyhow!(
1477                        "HTTP MCP server error for '{}': {}",
1478                        config.endpoint,
1479                        e
1480                    ))
1481                }
1482            }
1483        }
1484    }
1485
1486    /// Connect using stdio transport
1487    async fn connect_stdio(
1488        &self,
1489        config: &crate::config::mcp::McpStdioServerConfig,
1490    ) -> Result<RunningMcpService> {
1491        let provider_name = &self.config.name;
1492        debug!(
1493            "Setting up stdio connection for provider '{}'",
1494            provider_name
1495        );
1496
1497        debug!("Command: {} with args: {:?}", config.command, config.args);
1498
1499        let mut command = Command::new(&config.command);
1500        command.args(&config.args);
1501
1502        // Set working directory if specified
1503        if let Some(working_dir) = &config.working_directory {
1504            debug!("Using working directory: {}", working_dir);
1505            command.current_dir(working_dir);
1506        }
1507
1508        // Set environment variables if specified
1509        if !self.config.env.is_empty() {
1510            debug!(
1511                "Setting environment variables for provider '{}'",
1512                provider_name
1513            );
1514            command.envs(&self.config.env);
1515        }
1516
1517        // Create new process group to ensure proper cleanup
1518        command.process_group(0);
1519
1520        debug!(
1521            "Creating TokioChildProcess for provider '{}'",
1522            provider_name
1523        );
1524
1525        match TokioChildProcess::new(command) {
1526            Ok(child_process) => {
1527                debug!(
1528                    "Successfully created child process for provider '{}'",
1529                    provider_name
1530                );
1531
1532                // Add timeout and better error handling for the MCP service
1533                let handler = LoggingClientHandler::new(provider_name);
1534
1535                match tokio::time::timeout(
1536                    tokio::time::Duration::from_secs(30),
1537                    handler.serve(child_process),
1538                )
1539                .await
1540                {
1541                    Ok(Ok(connection)) => {
1542                        info!(
1543                            "Successfully established connection to MCP provider '{}'",
1544                            provider_name
1545                        );
1546                        Ok(connection)
1547                    }
1548                    Ok(Err(e)) => {
1549                        // Check if this is a process-related error
1550                        let error_msg = e.to_string();
1551                        if error_msg.contains("No such process")
1552                            || error_msg.contains("ESRCH")
1553                            || error_msg.contains("EPIPE")
1554                            || error_msg.contains("Broken pipe")
1555                            || error_msg.contains("write EPIPE")
1556                        {
1557                            debug!(
1558                                "MCP provider '{}' pipe/process error during connection (normal during shutdown): {}",
1559                                provider_name, e
1560                            );
1561                            Err(anyhow::anyhow!("MCP provider connection terminated: {}", e))
1562                        } else {
1563                            error!(
1564                                "Failed to establish MCP connection for provider '{}': {}",
1565                                provider_name, e
1566                            );
1567                            Err(anyhow::anyhow!("Failed to serve MCP connection: {}", e))
1568                        }
1569                    }
1570                    Err(_timeout) => {
1571                        warn!(
1572                            "MCP provider '{}' connection timed out after 30 seconds",
1573                            provider_name
1574                        );
1575                        Err(anyhow::anyhow!("MCP provider connection timeout"))
1576                    }
1577                }
1578            }
1579            Err(e) => {
1580                // Check if this is a process creation error
1581                let error_msg = e.to_string();
1582                if error_msg.contains("No such process") || error_msg.contains("ESRCH") {
1583                    error!(
1584                        "Failed to create child process for provider '{}' - process may have terminated: {}",
1585                        provider_name, e
1586                    );
1587                } else {
1588                    error!(
1589                        "Failed to create child process for provider '{}': {}",
1590                        provider_name, e
1591                    );
1592                }
1593                Err(anyhow::anyhow!("Failed to create child process: {}", e))
1594            }
1595        }
1596    }
1597}
1598
1599/// Type alias for running MCP service
1600type RunningMcpService =
1601    rmcp::service::RunningService<rmcp::service::RoleClient, LoggingClientHandler>;
1602
1603/// Status information about the MCP client
1604#[derive(Debug, Clone)]
1605pub struct McpClientStatus {
1606    pub enabled: bool,
1607    pub provider_count: usize,
1608    pub active_connections: usize,
1609    pub configured_providers: Vec<String>,
1610}
1611
1612impl McpClient {
1613    /// Get MCP client status information
1614    pub fn get_status(&self) -> McpClientStatus {
1615        McpClientStatus {
1616            enabled: self.config.enabled,
1617            provider_count: self.providers.len(),
1618            active_connections: self
1619                .active_connections
1620                .try_lock()
1621                .map(|connections| connections.len())
1622                .unwrap_or(0),
1623            configured_providers: self.providers.keys().cloned().collect(),
1624        }
1625    }
1626}
1627
1628/// Trait for MCP tool execution
1629#[async_trait]
1630pub trait McpToolExecutor: Send + Sync {
1631    /// Execute an MCP tool
1632    async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value>;
1633
1634    /// List available MCP tools
1635    async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>>;
1636
1637    /// Check if an MCP tool exists
1638    async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool>;
1639
1640    /// Get MCP client status information
1641    fn get_status(&self) -> McpClientStatus;
1642}
1643
1644#[async_trait]
1645impl McpToolExecutor for McpClient {
1646    async fn execute_mcp_tool(&self, tool_name: &str, args: Value) -> Result<Value> {
1647        self.execute_tool(tool_name, args).await
1648    }
1649
1650    async fn list_mcp_tools(&self) -> Result<Vec<McpToolInfo>> {
1651        self.list_tools().await
1652    }
1653
1654    async fn has_mcp_tool(&self, tool_name: &str) -> Result<bool> {
1655        if self.providers.is_empty() {
1656            return Ok(false);
1657        }
1658
1659        let mut provider_errors = Vec::new();
1660
1661        for (provider_name, provider) in &self.providers {
1662            let provider_id = provider_name.as_str();
1663            match provider.has_tool(tool_name).await {
1664                Ok(true) => {
1665                    if self
1666                        .allowlist
1667                        .read()
1668                        .is_tool_allowed(provider_id, tool_name)
1669                    {
1670                        self.record_tool_provider(provider_id, tool_name);
1671                        return Ok(true);
1672                    }
1673
1674                    self.audit_log(
1675                        Some(provider_id),
1676                        "mcp.tool_denied",
1677                        Level::DEBUG,
1678                        format!(
1679                            "Tool '{}' exists on provider '{}' but is blocked by allow list",
1680                            tool_name, provider_id
1681                        ),
1682                    );
1683                }
1684                Ok(false) => continue,
1685                Err(e) => {
1686                    let error_msg = format!("Error checking provider '{}': {}", provider_name, e);
1687                    warn!("{}", error_msg);
1688                    provider_errors.push(error_msg);
1689                }
1690            }
1691        }
1692
1693        if !provider_errors.is_empty() {
1694            debug!(
1695                "Encountered {} errors while checking tool availability: {:?}",
1696                provider_errors.len(),
1697                provider_errors
1698            );
1699
1700            let summary = provider_errors.join("; ");
1701            return Err(anyhow::anyhow!(
1702                "Failed to confirm MCP tool '{}' availability. {}",
1703                tool_name,
1704                summary
1705            ));
1706        }
1707
1708        Ok(false)
1709    }
1710
1711    fn get_status(&self) -> McpClientStatus {
1712        self.get_status()
1713    }
1714}
1715
1716#[cfg(test)]
1717mod tests {
1718    use super::*;
1719    use crate::config::mcp::{McpStdioServerConfig, McpTransportConfig};
1720    use rmcp::model::{Content, Meta};
1721    use serde_json::json;
1722
1723    #[test]
1724    fn test_mcp_client_creation() {
1725        let config = McpClientConfig::default();
1726        let client = McpClient::new(config);
1727        assert!(!client.config.enabled);
1728        assert!(client.providers.is_empty());
1729    }
1730
1731    #[test]
1732    fn test_mcp_tool_info() {
1733        let tool_info = McpToolInfo {
1734            name: "test_tool".to_string(),
1735            description: "A test tool".to_string(),
1736            provider: "test_provider".to_string(),
1737            input_schema: json!({
1738                "type": "object",
1739                "properties": {
1740                    "input": {"type": "string"}
1741                }
1742            }),
1743        };
1744
1745        assert_eq!(tool_info.name, "test_tool");
1746        assert_eq!(tool_info.provider, "test_provider");
1747    }
1748
1749    #[test]
1750    fn test_provider_config() {
1751        let config = McpProviderConfig {
1752            name: "test".to_string(),
1753            transport: McpTransportConfig::Stdio(McpStdioServerConfig {
1754                command: "echo".to_string(),
1755                args: vec!["hello".to_string()],
1756                working_directory: None,
1757            }),
1758            env: HashMap::new(),
1759            enabled: true,
1760            max_concurrent_requests: 3,
1761        };
1762
1763        assert_eq!(config.name, "test");
1764        assert!(config.enabled);
1765        assert_eq!(config.max_concurrent_requests, 3);
1766    }
1767
1768    #[test]
1769    fn test_tool_info_creation() {
1770        let tool_info = McpToolInfo {
1771            name: "test_tool".to_string(),
1772            description: "A test tool".to_string(),
1773            provider: "test_provider".to_string(),
1774            input_schema: serde_json::json!({
1775                "type": "object",
1776                "properties": {
1777                    "input": {"type": "string"}
1778                }
1779            }),
1780        };
1781
1782        assert_eq!(tool_info.name, "test_tool");
1783        assert_eq!(tool_info.provider, "test_provider");
1784    }
1785
1786    #[test]
1787    fn test_format_tool_result_success() {
1788        let mut result = CallToolResult::structured(json!({
1789            "value": 42,
1790            "status": "ok"
1791        }));
1792        let mut meta = Meta::new();
1793        meta.insert("query".to_string(), Value::String("tokio".to_string()));
1794        result.meta = Some(meta);
1795
1796        let serialized = McpClient::format_tool_result("test", "demo", result).unwrap();
1797        assert_eq!(
1798            serialized.get("provider").and_then(Value::as_str),
1799            Some("test")
1800        );
1801        assert_eq!(serialized.get("tool").and_then(Value::as_str), Some("demo"));
1802        assert_eq!(serialized.get("status").and_then(Value::as_str), Some("ok"));
1803        assert_eq!(serialized.get("value").and_then(Value::as_i64), Some(42));
1804        assert_eq!(
1805            serialized
1806                .get("meta")
1807                .and_then(Value::as_object)
1808                .and_then(|map| map.get("query"))
1809                .and_then(Value::as_str),
1810            Some("tokio")
1811        );
1812    }
1813
1814    #[test]
1815    fn test_format_tool_result_error_detection() {
1816        let result = CallToolResult::structured_error(json!({
1817            "message": "something went wrong"
1818        }));
1819
1820        let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1821        assert!(error.to_string().contains("something went wrong"));
1822    }
1823
1824    #[test]
1825    fn test_format_tool_result_error_from_text_content() {
1826        let result = CallToolResult::error(vec![Content::text("plain failure")]);
1827
1828        let error = McpClient::format_tool_result("test", "demo", result).unwrap_err();
1829        assert!(error.to_string().contains("plain failure"));
1830    }
1831}