1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
//! MCP server lifecycle management.
//!
//! Manages multiple MCP server processes, handles tool discovery,
//! and routes tool calls to the correct server.
use anyhow::{Result, anyhow};
use std::collections::HashMap;
use tracing::{info, warn};
use super::client::{ContentBlock, McpClient, McpToolDef, McpToolResult};
use super::transport::StdioTransport;
use crate::app::McpServerConfig;
/// Manages multiple MCP server connections.
pub struct McpServerManager {
/// Active server connections: server_name → client
servers: HashMap<String, McpClient>,
/// Cached tool definitions: (server_name, tool_def)
tools: Vec<(String, McpToolDef)>,
}
impl McpServerManager {
/// Start all configured MCP servers, initialize them, and discover tools.
///
/// Servers that fail to start are logged and skipped (non-fatal).
pub async fn start(configs: &HashMap<String, McpServerConfig>) -> Self {
let mut servers = HashMap::new();
let mut all_tools = Vec::new();
for (name, config) in configs {
info!(
"Starting MCP server: {} ({} {})",
name,
config.command,
config.args.join(" ")
);
match Self::start_one(name, config).await {
Ok((client, tools)) => {
let tool_count = tools.len();
for tool in &tools {
all_tools.push((name.clone(), tool.clone()));
}
info!(
"MCP server '{}' ready: {} tools ({})",
name,
tool_count,
client
.server_info
.as_ref()
.map(|s| s.name.as_str())
.unwrap_or("?")
);
servers.insert(name.clone(), client);
},
Err(e) => {
warn!("Failed to start MCP server '{}': {}", name, e);
},
}
}
Self {
servers,
tools: all_tools,
}
}
/// Start a single MCP server, initialize, and list tools.
async fn start_one(
name: &str,
config: &McpServerConfig,
) -> Result<(McpClient, Vec<McpToolDef>)> {
let transport = StdioTransport::spawn(&config.command, &config.args, &config.env).await?;
let mut client = McpClient::new(transport);
client
.initialize()
.await
.map_err(|e| anyhow!("MCP server '{}' initialization failed: {}", name, e))?;
let tools = client
.list_tools()
.await
.map_err(|e| anyhow!("MCP server '{}' tool discovery failed: {}", name, e))?;
Ok((client, tools))
}
/// Get all discovered tools with their server names.
pub fn get_all_tools(&self) -> &[(String, McpToolDef)] {
&self.tools
}
/// True iff the named server started and has an active client,
/// even if it advertised zero tools.
pub fn has_server(&self, name: &str) -> bool {
self.servers.contains_key(name)
}
/// Check if any MCP servers are active.
pub fn has_servers(&self) -> bool {
!self.servers.is_empty()
}
/// Call a tool on a specific server.
///
/// # Concurrency
///
/// Multiple concurrent calls to the same server will serialize at the
/// transport layer (`StdioTransport` holds a mutex over stdin writes and
/// uses a shared pending-response map for JSON-RPC correlation). This is
/// intentional: JSON-RPC over stdio is a byte stream, and interleaved
/// writes would corrupt messages. Calls to *different* servers run fully
/// in parallel since each has its own transport.
pub async fn call_tool(
&self,
server_name: &str,
tool_name: &str,
arguments: &serde_json::Value,
) -> Result<McpToolResult> {
let client = self
.servers
.get(server_name)
.ok_or_else(|| anyhow!("MCP server '{}' not found or not running", server_name))?;
client.call_tool(tool_name, arguments).await
}
/// Convert an MCP tool result into text suitable for a tool result message.
/// Images are returned separately for multimodal attachment. Audio is
/// attached through the same channel — adapters that don't support audio
/// will silently drop it. Resource links + embedded resources render as
/// text so the model can follow up with another tool call.
pub fn format_tool_result(result: &McpToolResult) -> (String, Option<Vec<String>>) {
let mut text_parts = Vec::new();
let mut images = Vec::new();
for block in &result.content {
match block {
ContentBlock::Text(text) => text_parts.push(text.clone()),
ContentBlock::Image { data, .. } => images.push(data.clone()),
ContentBlock::Audio { data, mime_type } => {
images.push(data.clone());
text_parts.push(format!("[audio attachment: {}]", mime_type));
},
ContentBlock::ResourceLink {
uri,
name,
description,
mime_type,
} => {
let label = name.as_deref().unwrap_or(uri.as_str());
let desc = description.as_deref().unwrap_or("");
let mime = mime_type.as_deref().unwrap_or("");
text_parts.push(format!(
"[resource link: {} ({}) — {} → {}]",
label, mime, desc, uri
));
},
ContentBlock::Resource {
uri,
mime_type,
text,
blob,
} => {
let mime = mime_type.as_deref().unwrap_or("");
if let Some(t) = text {
text_parts.push(format!("[resource {}]:\n{}", uri, t));
} else if let Some(b) = blob {
text_parts.push(format!(
"[resource {} ({}): {} bytes of base64]",
uri,
mime,
b.len()
));
} else {
text_parts.push(format!("[resource {} ({})]", uri, mime));
}
},
}
}
let text = if text_parts.is_empty() {
if result.is_error {
"MCP tool returned an error with no message".to_string()
} else {
"MCP tool returned no text content".to_string()
}
} else {
text_parts.join("\n")
};
let images = if images.is_empty() {
None
} else {
Some(images)
};
(text, images)
}
/// Gracefully shut down all MCP servers.
pub async fn shutdown(&self) {
for (name, client) in &self.servers {
info!("Shutting down MCP server: {}", name);
client.shutdown().await;
}
}
}