use std::collections::HashMap;
use std::fs;
use std::io::{BufRead, BufReader, Write};
use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::process::{Child, ChildStdin, ChildStdout};
use crate::network_policy::{Decision, NetworkPolicyDecider, host_from_url};
const ERROR_BODY_PREVIEW_BYTES: usize = 200;
fn mask_url_secrets(url: &str) -> String {
if let Ok(parsed) = reqwest::Url::parse(url) {
let mut clone = parsed.clone();
if !parsed.username().is_empty() || parsed.password().is_some() {
let _ = clone.set_username("***");
let _ = clone.set_password(Some("***"));
}
return clone.to_string();
}
url.to_string()
}
fn redact_body_preview(body: &str) -> String {
let mut out = body.to_string();
if let Some(idx) = out.to_lowercase().find("bearer ") {
let tail_start = idx + "bearer ".len();
if tail_start < out.len() {
let end = out[tail_start..]
.find(|c: char| c.is_whitespace() || c == '"' || c == ',')
.map_or(out.len(), |off| tail_start + off);
out.replace_range(tail_start..end, "***");
}
}
for needle in ["api_key=", "apikey=", "api-key=", "token="] {
if let Some(idx) = out.to_lowercase().find(needle) {
let tail_start = idx + needle.len();
let end = out[tail_start..]
.find(|c: char| c.is_whitespace() || c == '&' || c == '"' || c == ',')
.map_or(out.len(), |off| tail_start + off);
out.replace_range(tail_start..end, "***");
}
}
out
}
async fn bounded_body_excerpt(response: reqwest::Response, max_bytes: usize) -> String {
let body_text = response.text().await.unwrap_or_default();
if body_text.is_empty() {
return "<no body>".to_string();
}
let trimmed: String = body_text.chars().take(max_bytes).collect();
let suffix = if body_text.len() > trimmed.len() {
"…"
} else {
""
};
let one_line = trimmed.replace(['\n', '\r'], " ");
format!("{}{}", redact_body_preview(&one_line), suffix)
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct McpConfig {
#[serde(default)]
pub timeouts: McpTimeouts,
#[serde(default, alias = "mcpServers")]
pub servers: HashMap<String, McpServerConfig>,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize)]
#[allow(clippy::struct_field_names)]
pub struct McpTimeouts {
#[serde(default = "default_connect_timeout")]
pub connect_timeout: u64,
#[serde(default = "default_execute_timeout")]
pub execute_timeout: u64,
#[serde(default = "default_read_timeout")]
pub read_timeout: u64,
}
fn default_connect_timeout() -> u64 {
10
}
fn default_execute_timeout() -> u64 {
60
}
fn default_read_timeout() -> u64 {
120
}
impl Default for McpTimeouts {
fn default() -> Self {
Self {
connect_timeout: default_connect_timeout(),
execute_timeout: default_execute_timeout(),
read_timeout: default_read_timeout(),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpServerConfig {
pub command: Option<String>,
#[serde(default)]
pub args: Vec<String>,
#[serde(default)]
pub env: HashMap<String, String>,
pub url: Option<String>,
#[serde(default)]
pub connect_timeout: Option<u64>,
#[serde(default)]
pub execute_timeout: Option<u64>,
#[serde(default)]
pub read_timeout: Option<u64>,
#[serde(default)]
pub disabled: bool,
#[serde(default = "default_enabled")]
pub enabled: bool,
#[serde(default)]
pub required: bool,
#[serde(default)]
pub enabled_tools: Vec<String>,
#[serde(default)]
pub disabled_tools: Vec<String>,
}
fn default_enabled() -> bool {
true
}
impl McpServerConfig {
pub fn effective_connect_timeout(&self, global: &McpTimeouts) -> u64 {
self.connect_timeout.unwrap_or(global.connect_timeout)
}
pub fn effective_execute_timeout(&self, global: &McpTimeouts) -> u64 {
self.execute_timeout.unwrap_or(global.execute_timeout)
}
pub fn effective_read_timeout(&self, global: &McpTimeouts) -> u64 {
self.read_timeout.unwrap_or(global.read_timeout)
}
pub fn is_enabled(&self) -> bool {
self.enabled && !self.disabled
}
pub fn is_tool_enabled(&self, tool_name: &str) -> bool {
let allowed = if self.enabled_tools.is_empty() {
true
} else {
self.enabled_tools.iter().any(|t| t == tool_name)
};
if !allowed {
return false;
}
!self.disabled_tools.iter().any(|t| t == tool_name)
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpTool {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(rename = "inputSchema", default)]
pub input_schema: serde_json::Value,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpResource {
pub uri: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(rename = "mimeType", default)]
pub mime_type: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpResourceTemplate {
#[serde(rename = "uriTemplate")]
pub uri_template: String,
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(rename = "mimeType", default)]
pub mime_type: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpPrompt {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub arguments: Vec<McpPromptArgument>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct McpPromptArgument {
pub name: String,
#[serde(default)]
pub description: Option<String>,
#[serde(default)]
pub required: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
Connecting,
Ready,
Disconnected,
}
#[async_trait::async_trait]
pub trait McpTransport: Send + Sync {
async fn send(&mut self, msg: serde_json::Value) -> Result<()>;
async fn recv(&mut self) -> Result<serde_json::Value>;
}
pub struct StdioTransport {
_child: Child,
stdin: ChildStdin,
reader: tokio::io::BufReader<ChildStdout>,
}
#[async_trait::async_trait]
impl McpTransport for StdioTransport {
async fn send(&mut self, msg: serde_json::Value) -> Result<()> {
let line = serde_json::to_string(&msg)? + "\n";
self.stdin.write_all(line.as_bytes()).await?;
self.stdin.flush().await?;
Ok(())
}
async fn recv(&mut self) -> Result<serde_json::Value> {
let mut line = String::new();
loop {
line.clear();
let bytes = self.reader.read_line(&mut line).await?;
if bytes == 0 {
anyhow::bail!("Stdio transport closed");
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed) {
return Ok(value);
}
}
}
}
pub struct SseTransport {
client: reqwest::Client,
base_url: String,
endpoint_url: Option<String>,
receiver: tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>,
}
impl SseTransport {
pub async fn connect(
client: reqwest::Client,
url: String,
cancel_token: tokio_util::sync::CancellationToken,
) -> Result<Self> {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let client_clone = client.clone();
let url_clone = url.clone();
tokio::spawn(async move {
if cancel_token.is_cancelled() {
return;
}
use futures_util::FutureExt;
let result = std::panic::AssertUnwindSafe(Self::run_sse_loop(
client_clone,
url_clone,
tx,
cancel_token,
))
.catch_unwind()
.await;
match result {
Ok(res) => {
if let Err(e) = res {
tracing::error!("SSE loop error: {}", e);
}
}
Err(panic_err) => {
if let Some(msg) = panic_err.downcast_ref::<&str>() {
tracing::error!("SSE loop panicked: {}", msg);
} else if let Some(msg) = panic_err.downcast_ref::<String>() {
tracing::error!("SSE loop panicked: {}", msg);
} else {
tracing::error!("SSE loop panicked with unknown error");
}
}
}
});
Ok(Self {
client,
base_url: url,
endpoint_url: None,
receiver: rx,
})
}
async fn run_sse_loop(
client: reqwest::Client,
url: String,
tx: tokio::sync::mpsc::UnboundedSender<serde_json::Value>,
cancel_token: tokio_util::sync::CancellationToken,
) -> Result<()> {
let response = client.get(&url).send().await.with_context(|| {
format!(
"MCP SSE connect failed (transport=http url={})",
mask_url_secrets(&url),
)
})?;
let status = response.status();
if !status.is_success() {
let body_excerpt = bounded_body_excerpt(response, ERROR_BODY_PREVIEW_BYTES).await;
anyhow::bail!(
"MCP SSE rejected (transport=http url={} status={}): {}",
mask_url_secrets(&url),
status,
body_excerpt,
);
}
let mut stream = response.bytes_stream();
use futures_util::StreamExt;
let mut buffer = String::new();
loop {
if cancel_token.is_cancelled() {
tracing::debug!("SSE loop cancelled");
break;
}
let item = tokio::select! {
_ = cancel_token.cancelled() => {
tracing::debug!("SSE loop shutting down");
break;
}
item = stream.next() => {
match item {
Some(i) => i,
None => break,
}
}
};
let chunk = item?;
let s = String::from_utf8_lossy(&chunk);
buffer.push_str(&s);
while let Some(pos) = buffer.find("\n\n") {
let event_block = buffer[..pos].to_string();
buffer = buffer[pos + 2..].to_string();
let mut event_type = "message";
let mut data = String::new();
for line in event_block.lines() {
if let Some(stripped) = line.strip_prefix("event: ") {
event_type = stripped;
} else if let Some(stripped) = line.strip_prefix("data: ") {
data.push_str(stripped);
}
}
match event_type {
"endpoint" => {
let _ = tx.send(serde_json::json!({
"__internal_sse_endpoint__": data
}));
}
"message" => {
if let Ok(val) = serde_json::from_str::<serde_json::Value>(&data) {
let _ = tx.send(val);
}
}
_ => {}
}
}
}
Ok(())
}
}
#[async_trait::async_trait]
impl McpTransport for SseTransport {
async fn send(&mut self, msg: serde_json::Value) -> Result<()> {
let endpoint = self
.endpoint_url
.as_ref()
.context("SSE endpoint not yet discovered")?;
let response = self.client.post(endpoint).json(&msg).send().await?;
if !response.status().is_success() {
anyhow::bail!("Failed to send message via SSE POST: {}", response.status());
}
Ok(())
}
async fn recv(&mut self) -> Result<serde_json::Value> {
loop {
let msg = self.receiver.recv().await.context("SSE transport closed")?;
if let Some(endpoint) = msg.get("__internal_sse_endpoint__") {
let url_str = endpoint.as_str().context("Invalid endpoint format")?;
if url_str.starts_with("http") {
self.endpoint_url = Some(url_str.to_string());
} else {
let base = reqwest::Url::parse(&self.base_url)?;
let joined = base.join(url_str)?;
self.endpoint_url = Some(joined.to_string());
}
continue;
}
return Ok(msg);
}
}
}
pub struct McpConnection {
name: String,
transport: Box<dyn McpTransport>,
tools: Vec<McpTool>,
resources: Vec<McpResource>,
resource_templates: Vec<McpResourceTemplate>,
prompts: Vec<McpPrompt>,
request_id: AtomicU64,
state: ConnectionState,
config: McpServerConfig,
cancel_token: tokio_util::sync::CancellationToken,
}
impl McpConnection {
pub async fn connect_with_policy(
name: String,
config: McpServerConfig,
global_timeouts: &McpTimeouts,
network_policy: Option<&NetworkPolicyDecider>,
) -> Result<Self> {
let connect_timeout_secs = config.effective_connect_timeout(global_timeouts);
let cancel_token = tokio_util::sync::CancellationToken::new();
let transport: Box<dyn McpTransport> = if let Some(url) = &config.url {
if let Some(decider) = network_policy
&& let Some(host) = host_from_url(url)
{
match decider.evaluate(&host, "mcp") {
Decision::Allow => {}
Decision::Deny => {
anyhow::bail!(
"MCP server '{name}' connection to '{host}' blocked by network policy"
);
}
Decision::Prompt => {
anyhow::bail!(
"MCP server '{name}' connection to '{host}' requires approval; \
re-run after `/network allow {host}` or set network.default = \"allow\" in config"
);
}
}
}
let client = reqwest::Client::builder()
.timeout(Duration::from_secs(connect_timeout_secs))
.build()?;
Box::new(SseTransport::connect(client, url.clone(), cancel_token.clone()).await?)
} else if let Some(command) = &config.command {
let mut cmd = tokio::process::Command::new(command);
cmd.args(&config.args)
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::null())
.kill_on_drop(true);
for (key, value) in &config.env {
cmd.env(key, value);
}
let mut child = cmd.spawn().with_context(|| {
let env_keys: Vec<&str> = config.env.keys().map(String::as_str).collect();
format!(
"MCP stdio spawn failed (transport=stdio server={name} cmd={command:?} args={:?} env_keys={env_keys:?})",
config.args,
)
})?;
let stdin = child.stdin.take().context("Failed to get MCP stdin")?;
let stdout = child.stdout.take().context("Failed to get MCP stdout")?;
Box::new(StdioTransport {
_child: child,
stdin,
reader: tokio::io::BufReader::new(stdout),
})
} else {
anyhow::bail!(
"MCP server '{}' config must have either 'command' or 'url'",
name
);
};
let mut conn = Self {
name: name.clone(),
transport,
tools: Vec::new(),
resources: Vec::new(),
resource_templates: Vec::new(),
prompts: Vec::new(),
request_id: AtomicU64::new(1),
state: ConnectionState::Connecting,
config,
cancel_token,
};
tokio::time::timeout(Duration::from_secs(connect_timeout_secs), conn.initialize())
.await
.with_context(|| format!("MCP server '{name}' initialization timed out"))??;
tokio::time::timeout(
Duration::from_secs(connect_timeout_secs),
conn.discover_all(),
)
.await
.with_context(|| format!("MCP server '{name}' discovery timed out"))??;
conn.state = ConnectionState::Ready;
Ok(conn)
}
async fn initialize(&mut self) -> Result<()> {
let init_id = self.next_id();
self.send(serde_json::json!({
"jsonrpc": "2.0",
"id": init_id,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"clientInfo": {
"name": "deepseek-tui",
"version": env!("CARGO_PKG_VERSION")
},
"capabilities": {
"tools": {},
"resources": {},
"prompts": {}
}
}
}))
.await?;
self.recv(init_id).await?;
self.send(serde_json::json!({
"jsonrpc": "2.0",
"method": "notifications/initialized"
}))
.await?;
Ok(())
}
async fn discover_all(&mut self) -> Result<()> {
self.discover_tools().await?;
self.discover_resources().await?;
self.discover_resource_templates().await?;
self.discover_prompts().await?;
Ok(())
}
async fn discover_tools(&mut self) -> Result<()> {
let list_id = self.next_id();
self.send(serde_json::json!({
"jsonrpc": "2.0",
"id": list_id,
"method": "tools/list",
"params": {}
}))
.await?;
let response = self.recv(list_id).await?;
if let Some(result) = response.get("result")
&& let Some(tools) = result.get("tools")
{
self.tools = serde_json::from_value(tools.clone()).unwrap_or_default();
}
Ok(())
}
async fn discover_resources(&mut self) -> Result<()> {
let list_id = self.next_id();
self.send(serde_json::json!({
"jsonrpc": "2.0",
"id": list_id,
"method": "resources/list",
"params": {}
}))
.await?;
let response = self.recv(list_id).await?;
if let Some(result) = response.get("result")
&& let Some(resources) = result.get("resources")
{
self.resources = serde_json::from_value(resources.clone()).unwrap_or_default();
}
Ok(())
}
async fn discover_resource_templates(&mut self) -> Result<()> {
let list_id = self.next_id();
self.send(serde_json::json!({
"jsonrpc": "2.0",
"id": list_id,
"method": "resources/templates/list",
"params": {}
}))
.await?;
let response = self.recv(list_id).await?;
if let Some(result) = response.get("result") {
let templates = result
.get("resourceTemplates")
.or_else(|| result.get("templates"))
.or_else(|| result.get("resource_templates"));
if let Some(templates) = templates {
self.resource_templates =
serde_json::from_value(templates.clone()).unwrap_or_default();
}
}
Ok(())
}
async fn discover_prompts(&mut self) -> Result<()> {
let list_id = self.next_id();
self.send(serde_json::json!({
"jsonrpc": "2.0",
"id": list_id,
"method": "prompts/list",
"params": {}
}))
.await?;
let response = self.recv(list_id).await?;
if let Some(result) = response.get("result")
&& let Some(prompts) = result.get("prompts")
{
self.prompts = serde_json::from_value(prompts.clone()).unwrap_or_default();
}
Ok(())
}
pub async fn call_tool(
&mut self,
tool_name: &str,
arguments: serde_json::Value,
timeout_secs: u64,
) -> Result<serde_json::Value> {
self.call_method(
"tools/call",
serde_json::json!({
"name": tool_name,
"arguments": arguments
}),
timeout_secs,
)
.await
}
pub async fn read_resource(
&mut self,
uri: &str,
timeout_secs: u64,
) -> Result<serde_json::Value> {
self.call_method(
"resources/read",
serde_json::json!({
"uri": uri
}),
timeout_secs,
)
.await
}
pub async fn get_prompt(
&mut self,
prompt_name: &str,
arguments: serde_json::Value,
timeout_secs: u64,
) -> Result<serde_json::Value> {
self.call_method(
"prompts/get",
serde_json::json!({
"name": prompt_name,
"arguments": arguments
}),
timeout_secs,
)
.await
}
async fn call_method(
&mut self,
method: &str,
params: serde_json::Value,
timeout_secs: u64,
) -> Result<serde_json::Value> {
if self.state != ConnectionState::Ready {
anyhow::bail!(
"Failed to call MCP method '{}': connection '{}' is not ready",
method,
self.name
);
}
let call_id = self.next_id();
self.send(serde_json::json!({
"jsonrpc": "2.0",
"id": call_id,
"method": method,
"params": params
}))
.await?;
let response = tokio::time::timeout(Duration::from_secs(timeout_secs), self.recv(call_id))
.await
.with_context(|| {
format!(
"MCP method '{}' on server '{}' timed out after {}s",
method, self.name, timeout_secs
)
})??;
if let Some(error) = response.get("error") {
return Err(anyhow::anyhow!(
"MCP error in '{}': {}",
method,
serde_json::to_string_pretty(error)?
));
}
Ok(response
.get("result")
.cloned()
.unwrap_or(serde_json::json!(null)))
}
pub fn tools(&self) -> &[McpTool] {
&self.tools
}
pub fn resources(&self) -> &[McpResource] {
&self.resources
}
pub fn resource_templates(&self) -> &[McpResourceTemplate] {
&self.resource_templates
}
pub fn prompts(&self) -> &[McpPrompt] {
&self.prompts
}
#[allow(dead_code)] pub fn name(&self) -> &str {
&self.name
}
pub fn is_ready(&self) -> bool {
self.state == ConnectionState::Ready
}
pub fn config(&self) -> &McpServerConfig {
&self.config
}
#[allow(dead_code)] pub fn state(&self) -> ConnectionState {
self.state
}
fn next_id(&self) -> u64 {
self.request_id.fetch_add(1, Ordering::SeqCst)
}
async fn send(&mut self, msg: serde_json::Value) -> Result<()> {
self.transport.send(msg).await
}
async fn recv(&mut self, expected_id: u64) -> Result<serde_json::Value> {
loop {
let value = self.transport.recv().await.inspect_err(|_e| {
self.state = ConnectionState::Disconnected;
})?;
if value.get("id").and_then(serde_json::Value::as_u64) == Some(expected_id) {
return Ok(value);
}
}
}
#[allow(dead_code)] pub fn close(&mut self) {
self.cancel_token.cancel();
self.state = ConnectionState::Disconnected;
}
}
impl Drop for McpConnection {
fn drop(&mut self) {
self.cancel_token.cancel();
}
}
pub struct McpPool {
connections: HashMap<String, McpConnection>,
config: McpConfig,
network_policy: Option<NetworkPolicyDecider>,
}
impl McpPool {
pub fn new(config: McpConfig) -> Self {
Self {
connections: HashMap::new(),
config,
network_policy: None,
}
}
pub fn from_config_path(path: &std::path::Path) -> Result<Self> {
let config = if path.exists() {
let contents = fs::read_to_string(path)
.with_context(|| format!("Failed to read MCP config: {}", path.display()))?;
serde_json::from_str(&contents)
.with_context(|| format!("Failed to parse MCP config: {}", path.display()))?
} else {
McpConfig::default()
};
Ok(Self::new(config))
}
pub fn with_network_policy(mut self, policy: NetworkPolicyDecider) -> Self {
self.network_policy = Some(policy);
self
}
pub async fn get_or_connect(&mut self, server_name: &str) -> Result<&mut McpConnection> {
let is_ready = self
.connections
.get(server_name)
.map(|conn| conn.is_ready())
.unwrap_or(false);
if is_ready {
return self
.connections
.get_mut(server_name)
.ok_or_else(|| anyhow::anyhow!("MCP connection disappeared for {server_name}"));
}
self.connections.remove(server_name);
let server_config = self
.config
.servers
.get(server_name)
.ok_or_else(|| anyhow::anyhow!("Failed to find MCP server: {server_name}"))?
.clone();
if !server_config.is_enabled() {
anyhow::bail!("Failed to connect MCP server '{server_name}': server is disabled");
}
let connection = McpConnection::connect_with_policy(
server_name.to_string(),
server_config,
&self.config.timeouts,
self.network_policy.as_ref(),
)
.await?;
self.connections.insert(server_name.to_string(), connection);
self.connections
.get_mut(server_name)
.ok_or_else(|| anyhow::anyhow!("Failed to store MCP connection for {server_name}"))
}
pub async fn connect_all(&mut self) -> Vec<(String, anyhow::Error)> {
let mut errors = Vec::new();
let names: Vec<String> = self
.config
.servers
.keys()
.filter(|n| self.config.servers[*n].is_enabled())
.cloned()
.collect();
for name in names {
if let Err(e) = self.get_or_connect(&name).await {
errors.push((name, e));
}
}
for (name, server_cfg) in &self.config.servers {
if server_cfg.required
&& server_cfg.is_enabled()
&& !self
.connections
.get(name)
.is_some_and(McpConnection::is_ready)
{
errors.push((
name.clone(),
anyhow::anyhow!("required MCP server failed to initialize"),
));
}
}
errors
}
pub fn all_tools(&self) -> Vec<(String, &McpTool)> {
let mut tools = Vec::new();
for (server, conn) in &self.connections {
for tool in conn.tools() {
if !conn.config().is_tool_enabled(&tool.name) {
continue;
}
tools.push((format!("mcp_{}_{}", server, tool.name), tool));
}
}
tools
}
pub fn all_resources(&self) -> Vec<(String, &McpResource)> {
let mut resources = Vec::new();
for (server, conn) in &self.connections {
for resource in conn.resources() {
let safe_name = resource.name.replace(' ', "_").to_lowercase();
resources.push((format!("mcp_{}_{}", server, safe_name), resource));
}
}
resources
}
#[allow(dead_code)] pub fn all_resource_templates(&self) -> Vec<(String, &McpResourceTemplate)> {
let mut templates = Vec::new();
for (server, conn) in &self.connections {
for template in conn.resource_templates() {
let safe_name = template.name.replace(' ', "_").to_lowercase();
templates.push((format!("mcp_{}_{}", server, safe_name), template));
}
}
templates
}
async fn list_resources(&mut self, server: Option<String>) -> Result<Vec<serde_json::Value>> {
if let Some(server_name) = server {
let conn = self.get_or_connect(&server_name).await?;
let resources = conn
.resources()
.iter()
.map(|resource| {
serde_json::json!({
"server": server_name.clone(),
"uri": resource.uri,
"name": resource.name,
"description": resource.description,
"mime_type": resource.mime_type,
})
})
.collect();
return Ok(resources);
}
let _ = self.connect_all().await;
let mut items = Vec::new();
for (server, conn) in &self.connections {
for resource in conn.resources() {
items.push(serde_json::json!({
"server": server,
"uri": resource.uri,
"name": resource.name,
"description": resource.description,
"mime_type": resource.mime_type,
}));
}
}
Ok(items)
}
async fn list_resource_templates(
&mut self,
server: Option<String>,
) -> Result<Vec<serde_json::Value>> {
if let Some(server_name) = server {
let conn = self.get_or_connect(&server_name).await?;
let templates = conn
.resource_templates()
.iter()
.map(|template| {
serde_json::json!({
"server": server_name.clone(),
"uri_template": template.uri_template,
"name": template.name,
"description": template.description,
"mime_type": template.mime_type,
})
})
.collect();
return Ok(templates);
}
let _ = self.connect_all().await;
let mut items = Vec::new();
for (server, conn) in &self.connections {
for template in conn.resource_templates() {
items.push(serde_json::json!({
"server": server,
"uri_template": template.uri_template,
"name": template.name,
"description": template.description,
"mime_type": template.mime_type,
}));
}
}
Ok(items)
}
pub fn all_prompts(&self) -> Vec<(String, &McpPrompt)> {
let mut prompts = Vec::new();
for (server, conn) in &self.connections {
for prompt in conn.prompts() {
prompts.push((format!("mcp_{}_{}", server, prompt.name), prompt));
}
}
prompts
}
pub async fn read_resource(
&mut self,
server_name: &str,
uri: &str,
) -> Result<serde_json::Value> {
let global_timeouts = self.config.timeouts;
let conn = self.get_or_connect(server_name).await?;
let timeout = conn.config().effective_read_timeout(&global_timeouts);
conn.read_resource(uri, timeout).await
}
pub async fn get_prompt(
&mut self,
server_name: &str,
prompt_name: &str,
arguments: serde_json::Value,
) -> Result<serde_json::Value> {
let global_timeouts = self.config.timeouts;
let conn = self.get_or_connect(server_name).await?;
let timeout = conn.config().effective_execute_timeout(&global_timeouts);
conn.get_prompt(prompt_name, arguments, timeout).await
}
fn parse_prefixed_name<'a>(&self, prefixed_name: &'a str) -> Result<(&'a str, &'a str)> {
if !prefixed_name.starts_with("mcp_") {
anyhow::bail!("Invalid MCP tool name: {}", prefixed_name);
}
let rest = &prefixed_name[4..];
let Some((server, tool)) = rest.split_once('_') else {
anyhow::bail!("Invalid MCP tool name format: {}", prefixed_name);
};
Ok((server, tool))
}
pub fn to_api_tools(&self) -> Vec<crate::models::Tool> {
let mut api_tools = Vec::new();
for (name, tool) in self.all_tools() {
api_tools.push(crate::models::Tool {
tool_type: None,
name,
description: tool.description.clone().unwrap_or_default(),
input_schema: tool.input_schema.clone(),
allowed_callers: Some(vec!["direct".to_string()]),
defer_loading: Some(false),
input_examples: None,
strict: None,
cache_control: None,
});
}
if !self.config.servers.is_empty() {
api_tools.push(crate::models::Tool {
tool_type: None,
name: "list_mcp_resources".to_string(),
description: "List available MCP resources across servers (optionally filtered by server).".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Optional MCP server name to filter by" }
}
}),
allowed_callers: Some(vec!["direct".to_string()]),
defer_loading: Some(false),
input_examples: None,
strict: None,
cache_control: None,
});
api_tools.push(crate::models::Tool {
tool_type: None,
name: "list_mcp_resource_templates".to_string(),
description: "List available MCP resource templates across servers (optionally filtered by server).".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "Optional MCP server name to filter by" }
}
}),
allowed_callers: Some(vec!["direct".to_string()]),
defer_loading: Some(false),
input_examples: None,
strict: None,
cache_control: None,
});
}
let resources = self.all_resources();
if !resources.is_empty() {
api_tools.push(crate::models::Tool {
tool_type: None,
name: "mcp_read_resource".to_string(),
description: "Read a resource from an MCP server using its URI".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "The name of the MCP server" },
"uri": { "type": "string", "description": "The URI of the resource to read" }
},
"required": ["server", "uri"]
}),
allowed_callers: Some(vec!["direct".to_string()]),
defer_loading: Some(false),
input_examples: None,
strict: None,
cache_control: None,
});
api_tools.push(crate::models::Tool {
tool_type: None,
name: "read_mcp_resource".to_string(),
description: "Alias for mcp_read_resource.".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "The name of the MCP server" },
"uri": { "type": "string", "description": "The URI of the resource to read" }
},
"required": ["server", "uri"]
}),
allowed_callers: Some(vec!["direct".to_string()]),
defer_loading: Some(false),
input_examples: None,
strict: None,
cache_control: None,
});
}
let prompts = self.all_prompts();
if !prompts.is_empty() {
api_tools.push(crate::models::Tool {
tool_type: None,
name: "mcp_get_prompt".to_string(),
description: "Get a prompt from an MCP server".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"server": { "type": "string", "description": "The name of the MCP server" },
"name": { "type": "string", "description": "The name of the prompt" },
"arguments": {
"type": "object",
"description": "Optional arguments for the prompt",
"additionalProperties": { "type": "string" }
}
},
"required": ["server", "name"]
}),
allowed_callers: Some(vec!["direct".to_string()]),
defer_loading: Some(false),
input_examples: None,
strict: None,
cache_control: None,
});
}
api_tools
}
pub async fn call_tool(
&mut self,
prefixed_name: &str,
arguments: serde_json::Value,
) -> Result<serde_json::Value> {
if prefixed_name == "list_mcp_resources" {
let server = arguments
.get("server")
.and_then(|v| v.as_str())
.map(str::to_string);
let resources = self.list_resources(server).await?;
return Ok(serde_json::json!({ "resources": resources }));
}
if prefixed_name == "list_mcp_resource_templates" {
let server = arguments
.get("server")
.and_then(|v| v.as_str())
.map(str::to_string);
let templates = self.list_resource_templates(server).await?;
return Ok(serde_json::json!({ "templates": templates }));
}
if prefixed_name == "mcp_read_resource" {
let server_name = arguments
.get("server")
.and_then(|v| v.as_str())
.context("Missing 'server' argument")?;
let uri = arguments
.get("uri")
.and_then(|v| v.as_str())
.context("Missing 'uri' argument")?;
return self.read_resource(server_name, uri).await;
}
if prefixed_name == "read_mcp_resource" {
let server_name = arguments
.get("server")
.and_then(|v| v.as_str())
.context("Missing 'server' argument")?;
let uri = arguments
.get("uri")
.and_then(|v| v.as_str())
.context("Missing 'uri' argument")?;
return self.read_resource(server_name, uri).await;
}
if prefixed_name == "mcp_get_prompt" {
let server_name = arguments
.get("server")
.and_then(|v| v.as_str())
.context("Missing 'server' argument")?;
let name = arguments
.get("name")
.and_then(|v| v.as_str())
.context("Missing 'name' argument")?;
let args = arguments
.get("arguments")
.cloned()
.unwrap_or(serde_json::json!({}));
return self.get_prompt(server_name, name, args).await;
}
let (server_name, tool_name) = self.parse_prefixed_name(prefixed_name)?;
let global_timeouts = self.config.timeouts;
let conn = self.get_or_connect(server_name).await?;
if !conn.config().is_tool_enabled(tool_name) {
anyhow::bail!("MCP tool '{tool_name}' is disabled for server '{server_name}'");
}
let timeout = conn.config().effective_execute_timeout(&global_timeouts);
conn.call_tool(tool_name, arguments, timeout).await
}
#[allow(dead_code)] pub fn server_names(&self) -> Vec<&str> {
self.config
.servers
.keys()
.map(std::string::String::as_str)
.collect()
}
pub fn connected_servers(&self) -> Vec<&str> {
self.connections
.iter()
.filter(|(_, c)| c.is_ready())
.map(|(n, _)| n.as_str())
.collect()
}
#[allow(dead_code)] pub fn disconnect_all(&mut self) {
self.connections.clear();
}
#[allow(dead_code)] pub fn config(&self) -> &McpConfig {
&self.config
}
pub fn is_mcp_tool(name: &str) -> bool {
name.starts_with("mcp_")
|| matches!(
name,
"list_mcp_resources" | "list_mcp_resource_templates" | "read_mcp_resource"
)
}
}
#[allow(dead_code)] pub fn format_tool_result(result: &serde_json::Value) -> String {
let is_error = result
.get("isError")
.and_then(serde_json::Value::as_bool)
.unwrap_or(false);
let content = result
.get("content")
.and_then(|v| v.as_array())
.map_or_else(
|| serde_json::to_string_pretty(result).unwrap_or_default(),
|arr| {
arr.iter()
.filter_map(|item| match item.get("type")?.as_str()? {
"text" => item.get("text")?.as_str().map(String::from),
other => Some(format!("[{other} content]")),
})
.collect::<Vec<_>>()
.join("\n")
},
);
if is_error {
format!("Error: {content}")
} else {
content
}
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct McpServerInput {
pub name: String,
pub command: String,
pub args: Vec<String>,
pub env: Vec<String>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
#[allow(dead_code)] struct LegacyMcpServer {
command: String,
args: Vec<String>,
env: HashMap<String, String>,
#[serde(default)]
connect_timeout: Option<u64>,
#[serde(default)]
execute_timeout: Option<u64>,
#[serde(default)]
read_timeout: Option<u64>,
}
#[derive(Debug, Serialize, Deserialize, Default)]
#[allow(dead_code)] struct LegacyMcpConfig {
#[serde(default, alias = "mcpServers")]
servers: HashMap<String, LegacyMcpServer>,
#[serde(default)]
timeouts: McpTimeouts,
}
#[allow(dead_code)] pub fn list(path: &Path) -> Result<()> {
let config = load_legacy(path)?;
if config.servers.is_empty() {
println!("No MCP servers configured.");
return Ok(());
}
for (name, server) in config.servers {
println!("{} -> {} {}", name, server.command, server.args.join(" "));
}
Ok(())
}
#[allow(dead_code)] pub fn add(path: &Path, input: McpServerInput) -> Result<()> {
let mut config = load_legacy(path)?;
let env = parse_env(&input.env)?;
config.servers.insert(
input.name.clone(),
LegacyMcpServer {
command: input.command,
args: input.args,
env,
connect_timeout: None,
execute_timeout: None,
read_timeout: None,
},
);
save_legacy(path, &config)?;
println!("Added MCP server: {}", input.name);
Ok(())
}
#[allow(dead_code)] pub fn remove(path: &Path, name: &str) -> Result<()> {
let mut config = load_legacy(path)?;
if config.servers.remove(name).is_some() {
save_legacy(path, &config)?;
println!("Removed MCP server: {name}");
} else {
println!("No MCP server named {name}.");
}
Ok(())
}
#[allow(dead_code)] pub fn call_tool(
path: &Path,
server: &str,
tool: &str,
args: &serde_json::Value,
) -> Result<String> {
let config = load_legacy(path)?;
let Some(server_cfg) = config.servers.get(server) else {
anyhow::bail!("Failed to find MCP server: {server}");
};
let timeouts = config.timeouts;
let connect_timeout = server_cfg
.connect_timeout
.unwrap_or(timeouts.connect_timeout);
let execute_timeout = server_cfg
.execute_timeout
.unwrap_or(timeouts.execute_timeout);
let read_timeout = server_cfg.read_timeout.unwrap_or(timeouts.read_timeout);
let mut cmd = Command::new(&server_cfg.command);
cmd.args(&server_cfg.args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());
for (key, value) in &server_cfg.env {
cmd.env(key, value);
}
let mut child = cmd.spawn().with_context(|| "Failed to spawn MCP server")?;
let mut stdin = child.stdin.take().context("Failed to open MCP stdin")?;
let stdout = child.stdout.take().context("Failed to open MCP stdout")?;
let reader = Arc::new(Mutex::new(BufReader::new(stdout)));
let child = Arc::new(Mutex::new(child));
let init_id = next_id();
let init_payload = serde_json::json!({
"jsonrpc": "2.0",
"id": init_id,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"clientInfo": { "name": "deepseek-tui", "version": env!("CARGO_PKG_VERSION") },
"capabilities": {}
}
});
send_request_sync(&mut stdin, &init_payload)?;
if let Err(e) = read_response_with_timeout(
&reader,
&child,
init_id,
Duration::from_secs(connect_timeout),
read_timeout,
) {
if let Ok(mut child_guard) = child.lock() {
let _ = child_guard.kill();
}
return Err(e);
}
let initialized_payload = serde_json::json!({
"jsonrpc": "2.0",
"method": "initialized",
"params": {}
});
send_request_sync(&mut stdin, &initialized_payload)?;
let call_id = next_id();
let call_payload = serde_json::json!({
"jsonrpc": "2.0",
"id": call_id,
"method": "tools/call",
"params": {
"name": tool,
"arguments": args
}
});
send_request_sync(&mut stdin, &call_payload)?;
let response = match read_response_with_timeout(
&reader,
&child,
call_id,
Duration::from_secs(execute_timeout),
read_timeout,
) {
Ok(result) => result,
Err(e) => {
if let Ok(mut child_guard) = child.lock() {
let _ = child_guard.kill();
}
return Err(e);
}
};
if let Ok(mut child_guard) = child.lock() {
let _ = child_guard.kill();
}
if let Some(result) = response.get("result") {
return Ok(serde_json::to_string_pretty(result)?);
}
if let Some(error) = response.get("error") {
return Ok(serde_json::to_string_pretty(error)?);
}
Ok(serde_json::to_string_pretty(&response)?)
}
#[allow(dead_code)] fn load_legacy(path: &Path) -> Result<LegacyMcpConfig> {
if path.exists() {
let contents = fs::read_to_string(path)
.with_context(|| format!("Failed to read {}", path.display()))?;
let config = serde_json::from_str(&contents)
.with_context(|| format!("Failed to parse {}", path.display()))?;
Ok(config)
} else {
Ok(LegacyMcpConfig::default())
}
}
#[allow(dead_code)] fn save_legacy(path: &Path, config: &LegacyMcpConfig) -> Result<()> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let contents = serde_json::to_string_pretty(config)?;
fs::write(path, contents)?;
Ok(())
}
#[allow(dead_code)] fn parse_env(items: &[String]) -> Result<HashMap<String, String>> {
let mut env = HashMap::new();
for item in items {
let parts: Vec<&str> = item.splitn(2, '=').collect();
if parts.len() != 2 {
anyhow::bail!("Failed to parse MCP env var '{item}': expected KEY=VALUE");
}
env.insert(parts[0].to_string(), parts[1].to_string());
}
Ok(env)
}
#[allow(dead_code)] fn send_request_sync(stdin: &mut impl Write, payload: &serde_json::Value) -> Result<()> {
let line = serde_json::to_string(payload)?;
stdin
.write_all(format!("{line}\n").as_bytes())
.with_context(|| "Failed to write MCP request")?;
stdin.flush()?;
Ok(())
}
#[allow(dead_code)] fn read_response_with_timeout(
reader: &Arc<Mutex<BufReader<std::process::ChildStdout>>>,
child: &Arc<Mutex<std::process::Child>>,
id: u64,
timeout: Duration,
read_timeout: u64,
) -> Result<serde_json::Value> {
let effective_timeout = Duration::from_secs(timeout.as_secs().min(read_timeout));
let (tx, rx) = std::sync::mpsc::channel();
let reader_clone = Arc::clone(reader);
std::thread::spawn(move || {
let result = read_response_sync(&reader_clone, id);
let _ = tx.send(result);
});
if let Ok(result) = rx.recv_timeout(effective_timeout) {
result
} else {
if let Ok(mut child_guard) = child.lock() {
let _ = child_guard.kill();
}
anyhow::bail!(
"Failed to read MCP response: timed out after {}s",
effective_timeout.as_secs()
)
}
}
#[allow(dead_code)] fn read_response_sync(
reader: &Arc<Mutex<BufReader<std::process::ChildStdout>>>,
id: u64,
) -> Result<serde_json::Value> {
let mut line = String::new();
loop {
line.clear();
let read = {
let mut guard = reader
.lock()
.map_err(|_| anyhow::anyhow!("MCP reader lock poisoned"))?;
guard.read_line(&mut line)?
};
if read == 0 {
anyhow::bail!("Failed to read MCP response: server closed output before responding.");
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(value) = serde_json::from_str::<serde_json::Value>(trimmed)
&& value.get("id").and_then(serde_json::Value::as_u64) == Some(id)
{
return Ok(value);
}
}
}
#[allow(dead_code)] fn next_id() -> u64 {
let micros = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_micros();
u64::try_from(micros).unwrap_or(u64::MAX)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_mcp_config_defaults() {
let config = McpConfig::default();
assert_eq!(config.timeouts.connect_timeout, 10);
assert_eq!(config.timeouts.execute_timeout, 60);
assert_eq!(config.timeouts.read_timeout, 120);
assert!(config.servers.is_empty());
}
#[test]
fn test_mcp_config_parse() {
let json = r#"{
"timeouts": {
"connect_timeout": 15,
"execute_timeout": 90
},
"servers": {
"test": {
"command": "node",
"args": ["server.js"],
"env": {"FOO": "bar"}
}
}
}"#;
let config: McpConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.timeouts.connect_timeout, 15);
assert_eq!(config.timeouts.execute_timeout, 90);
assert_eq!(config.timeouts.read_timeout, 120); assert!(config.servers.contains_key("test"));
let server = config.servers.get("test").unwrap();
assert_eq!(server.command, Some("node".to_string()));
assert_eq!(server.args, vec!["server.js"]);
assert_eq!(server.env.get("FOO"), Some(&"bar".to_string()));
}
#[test]
fn test_server_effective_timeouts() {
let global = McpTimeouts::default();
let server_with_override = McpServerConfig {
command: Some("test".to_string()),
args: vec![],
env: HashMap::new(),
url: None,
connect_timeout: Some(20),
execute_timeout: None,
read_timeout: Some(180),
disabled: false,
enabled: true,
required: false,
enabled_tools: Vec::new(),
disabled_tools: Vec::new(),
};
assert_eq!(server_with_override.effective_connect_timeout(&global), 20);
assert_eq!(server_with_override.effective_execute_timeout(&global), 60); assert_eq!(server_with_override.effective_read_timeout(&global), 180);
}
#[test]
fn test_mcp_pool_is_mcp_tool() {
assert!(McpPool::is_mcp_tool("mcp_filesystem_read"));
assert!(McpPool::is_mcp_tool("mcp_git_status"));
assert!(McpPool::is_mcp_tool("list_mcp_resources"));
assert!(McpPool::is_mcp_tool("list_mcp_resource_templates"));
assert!(McpPool::is_mcp_tool("read_mcp_resource"));
assert!(!McpPool::is_mcp_tool("read_file"));
assert!(!McpPool::is_mcp_tool("exec_shell"));
}
#[test]
fn test_format_tool_result_text() {
let result = serde_json::json!({
"content": [
{"type": "text", "text": "Hello, world!"}
]
});
assert_eq!(format_tool_result(&result), "Hello, world!");
}
#[test]
fn test_format_tool_result_error() {
let result = serde_json::json!({
"isError": true,
"content": [
{"type": "text", "text": "Something went wrong"}
]
});
assert_eq!(format_tool_result(&result), "Error: Something went wrong");
}
#[test]
fn test_format_tool_result_multiple_content() {
let result = serde_json::json!({
"content": [
{"type": "text", "text": "Line 1"},
{"type": "text", "text": "Line 2"},
{"type": "image", "data": "base64..."}
]
});
let formatted = format_tool_result(&result);
assert!(formatted.contains("Line 1"));
assert!(formatted.contains("Line 2"));
assert!(formatted.contains("[image content]"));
}
#[test]
#[cfg(unix)]
fn test_read_response_timeout_kills_child() {
let mut child = Command::new("sh")
.arg("-c")
.arg("sleep 5")
.stdout(Stdio::piped())
.spawn()
.expect("spawn sleep");
let stdout = child.stdout.take().expect("stdout");
let reader = Arc::new(Mutex::new(BufReader::new(stdout)));
let child = Arc::new(Mutex::new(child));
let result = read_response_with_timeout(&reader, &child, 1, Duration::from_secs(1), 1);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("timed out"));
let status = child
.lock()
.expect("lock child")
.wait()
.expect("wait child");
assert!(!status.success());
}
#[tokio::test]
async fn test_mcp_pool_empty_config() {
let pool = McpPool::new(McpConfig::default());
assert!(pool.server_names().is_empty());
assert!(pool.all_tools().is_empty());
}
#[test]
fn mask_url_secrets_strips_userinfo() {
let masked = mask_url_secrets("https://user:s3cret@host.example/api?foo=bar");
assert!(masked.contains("***"), "expected masked userinfo: {masked}");
assert!(!masked.contains("s3cret"), "secret leaked: {masked}");
assert!(masked.contains("host.example"), "host preserved: {masked}");
}
#[test]
fn mask_url_secrets_passes_through_clean_url() {
assert_eq!(
mask_url_secrets("https://api.example.com/mcp"),
"https://api.example.com/mcp"
);
}
#[test]
fn redact_body_preview_masks_bearer_token() {
let redacted = redact_body_preview("Authorization: Bearer abc.def.ghi end");
assert!(redacted.contains("Bearer ***"), "redacted: {redacted}");
assert!(!redacted.contains("abc.def.ghi"), "leaked: {redacted}");
}
#[test]
fn redact_body_preview_masks_api_key_param() {
let redacted = redact_body_preview("error message api_key=sk-12345&other=val");
assert!(redacted.contains("api_key=***"), "redacted: {redacted}");
assert!(!redacted.contains("sk-12345"), "leaked: {redacted}");
assert!(
redacted.contains("other=val"),
"non-secret preserved: {redacted}"
);
}
}