codineer_runtime/mcp_stdio/
manager.rs1use std::collections::BTreeMap;
2
3use serde_json::Value as JsonValue;
4
5use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
6use crate::mcp::mcp_tool_name;
7use crate::mcp_client::{McpClientBootstrap, McpClientTransport};
8use crate::mcp_remote::McpRemoteClient;
9
10use super::process::{default_initialize_params, spawn_mcp_stdio_process, McpStdioProcess};
11use super::types::{
12 JsonRpcId, JsonRpcResponse, ManagedMcpTool, McpListToolsParams, McpServerManagerError,
13 McpToolCallParams, McpToolCallResult, UnsupportedMcpServer,
14};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub(crate) struct ToolRoute {
18 pub(crate) server_name: String,
19 pub(crate) raw_name: String,
20}
21
22#[derive(Debug)]
23pub(crate) enum McpServerProcess {
24 Stdio(Box<McpStdioProcess>),
25 Remote(Box<McpRemoteClient>),
26}
27
28#[derive(Debug)]
29pub(crate) struct ManagedMcpServer {
30 pub(crate) bootstrap: McpClientBootstrap,
31 pub(crate) process: Option<McpServerProcess>,
32 pub(crate) initialized: bool,
33}
34
35impl ManagedMcpServer {
36 pub(crate) fn new(bootstrap: McpClientBootstrap) -> Self {
37 Self {
38 bootstrap,
39 process: None,
40 initialized: false,
41 }
42 }
43}
44
45#[derive(Debug)]
46pub struct McpServerManager {
47 servers: BTreeMap<String, ManagedMcpServer>,
48 unsupported_servers: Vec<UnsupportedMcpServer>,
49 tool_index: BTreeMap<String, ToolRoute>,
50 next_request_id: u64,
51}
52
53impl McpServerManager {
54 #[must_use]
55 pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
56 Self::from_servers(config.mcp().servers())
57 }
58
59 #[must_use]
60 pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
61 let mut managed_servers = BTreeMap::new();
62 let mut unsupported_servers = Vec::new();
63
64 for (server_name, server_config) in servers {
65 let transport = server_config.transport();
66 match transport {
67 McpTransport::Stdio | McpTransport::Sse | McpTransport::Http | McpTransport::Ws => {
68 let bootstrap =
69 McpClientBootstrap::from_scoped_config(server_name, server_config);
70 managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
71 }
72 McpTransport::Sdk | McpTransport::ManagedProxy => {
73 unsupported_servers.push(UnsupportedMcpServer {
74 server_name: server_name.clone(),
75 transport,
76 reason: format!(
77 "transport {transport:?} is not supported by McpServerManager"
78 ),
79 });
80 }
81 }
82 }
83
84 Self {
85 servers: managed_servers,
86 unsupported_servers,
87 tool_index: BTreeMap::new(),
88 next_request_id: 1,
89 }
90 }
91
92 #[must_use]
93 pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
94 &self.unsupported_servers
95 }
96
97 pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
98 let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
99 let mut discovered_tools = Vec::new();
100
101 for server_name in server_names {
102 self.ensure_server_ready(&server_name).await?;
103 self.clear_routes_for_server(&server_name);
104
105 let mut cursor = None;
106 loop {
107 let request_id = self.take_request_id();
108 let response = {
109 let server = self.server_mut(&server_name)?;
110 let process = server.process.as_mut().ok_or_else(|| {
111 McpServerManagerError::InvalidResponse {
112 server_name: server_name.clone(),
113 method: "tools/list",
114 details: "server process missing after initialization".to_string(),
115 }
116 })?;
117 let params = Some(McpListToolsParams {
118 cursor: cursor.clone(),
119 });
120 match process {
121 McpServerProcess::Stdio(p) => p.list_tools(request_id, params).await?,
122 McpServerProcess::Remote(c) => c.list_tools(request_id, params).await?,
123 }
124 };
125
126 if let Some(error) = response.error {
127 return Err(McpServerManagerError::JsonRpc {
128 server_name: server_name.clone(),
129 method: "tools/list",
130 error,
131 });
132 }
133
134 let result =
135 response
136 .result
137 .ok_or_else(|| McpServerManagerError::InvalidResponse {
138 server_name: server_name.clone(),
139 method: "tools/list",
140 details: "missing result payload".to_string(),
141 })?;
142
143 for tool in result.tools {
144 let qualified_name = mcp_tool_name(&server_name, &tool.name);
145 self.tool_index.insert(
146 qualified_name.clone(),
147 ToolRoute {
148 server_name: server_name.clone(),
149 raw_name: tool.name.clone(),
150 },
151 );
152 discovered_tools.push(ManagedMcpTool {
153 server_name: server_name.clone(),
154 qualified_name,
155 raw_name: tool.name.clone(),
156 tool,
157 });
158 }
159
160 match result.next_cursor {
161 Some(next_cursor) => cursor = Some(next_cursor),
162 None => break,
163 }
164 }
165 }
166
167 Ok(discovered_tools)
168 }
169
170 pub async fn call_tool(
171 &mut self,
172 qualified_tool_name: &str,
173 arguments: Option<JsonValue>,
174 ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
175 let route = self
176 .tool_index
177 .get(qualified_tool_name)
178 .cloned()
179 .ok_or_else(|| McpServerManagerError::UnknownTool {
180 qualified_name: qualified_tool_name.to_string(),
181 })?;
182
183 self.ensure_server_ready(&route.server_name).await?;
184 let request_id = self.take_request_id();
185 let params = McpToolCallParams {
186 name: route.raw_name,
187 arguments,
188 meta: None,
189 };
190 let response =
191 {
192 let server = self.server_mut(&route.server_name)?;
193 let process = server.process.as_mut().ok_or_else(|| {
194 McpServerManagerError::InvalidResponse {
195 server_name: route.server_name.clone(),
196 method: "tools/call",
197 details: "server process missing after initialization".to_string(),
198 }
199 })?;
200 match process {
201 McpServerProcess::Stdio(p) => p.call_tool(request_id, params).await?,
202 McpServerProcess::Remote(c) => c.call_tool(request_id, params).await?,
203 }
204 };
205 Ok(response)
206 }
207
208 pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
209 let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
210 for server_name in server_names {
211 let server = self.server_mut(&server_name)?;
212 if let Some(process) = server.process.as_mut() {
213 match process {
214 McpServerProcess::Stdio(p) => p.shutdown().await?,
215 McpServerProcess::Remote(c) => c.shutdown().await?,
216 }
217 }
218 server.process = None;
219 server.initialized = false;
220 }
221 Ok(())
222 }
223
224 fn clear_routes_for_server(&mut self, server_name: &str) {
225 self.tool_index
226 .retain(|_, route| route.server_name != server_name);
227 }
228
229 fn server_mut(
230 &mut self,
231 server_name: &str,
232 ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
233 self.servers
234 .get_mut(server_name)
235 .ok_or_else(|| McpServerManagerError::UnknownServer {
236 server_name: server_name.to_string(),
237 })
238 }
239
240 fn take_request_id(&mut self) -> JsonRpcId {
241 let id = self.next_request_id;
242 self.next_request_id = self.next_request_id.saturating_add(1);
243 JsonRpcId::Number(id)
244 }
245
246 async fn ensure_server_ready(
247 &mut self,
248 server_name: &str,
249 ) -> Result<(), McpServerManagerError> {
250 let needs_spawn = self
251 .servers
252 .get(server_name)
253 .map(|server| server.process.is_none())
254 .ok_or_else(|| McpServerManagerError::UnknownServer {
255 server_name: server_name.to_string(),
256 })?;
257
258 if needs_spawn {
259 let server = self.server_mut(server_name)?;
260 let process = match &server.bootstrap.transport {
261 McpClientTransport::Stdio(_) => {
262 McpServerProcess::Stdio(Box::new(spawn_mcp_stdio_process(&server.bootstrap)?))
263 }
264 McpClientTransport::Sse(_)
265 | McpClientTransport::Http(_)
266 | McpClientTransport::WebSocket(_) => McpServerProcess::Remote(Box::new(
267 McpRemoteClient::connect(&server.bootstrap)
268 .await
269 .map_err(|e| McpServerManagerError::SpawnFailed {
270 server_name: server_name.to_string(),
271 source: e,
272 })?,
273 )),
274 other => {
275 return Err(McpServerManagerError::InvalidResponse {
276 server_name: server_name.to_string(),
277 method: "connect",
278 details: format!("transport {other:?} not supported"),
279 });
280 }
281 };
282 server.process = Some(process);
283 server.initialized = false;
284 }
285
286 let needs_initialize = self
287 .servers
288 .get(server_name)
289 .map(|server| !server.initialized)
290 .ok_or_else(|| McpServerManagerError::UnknownServer {
291 server_name: server_name.to_string(),
292 })?;
293
294 if needs_initialize {
295 let request_id = self.take_request_id();
296 let params = default_initialize_params();
297 let response = {
298 let server = self.server_mut(server_name)?;
299 let process = server.process.as_mut().ok_or_else(|| {
300 McpServerManagerError::InvalidResponse {
301 server_name: server_name.to_string(),
302 method: "initialize",
303 details: "server process missing before initialize".to_string(),
304 }
305 })?;
306 match process {
307 McpServerProcess::Stdio(p) => p.initialize(request_id, params).await?,
308 McpServerProcess::Remote(c) => c.initialize(request_id, params).await?,
309 }
310 };
311
312 if let Some(error) = response.error {
313 return Err(McpServerManagerError::JsonRpc {
314 server_name: server_name.to_string(),
315 method: "initialize",
316 error,
317 });
318 }
319
320 if response.result.is_none() {
321 return Err(McpServerManagerError::InvalidResponse {
322 server_name: server_name.to_string(),
323 method: "initialize",
324 details: "missing result payload".to_string(),
325 });
326 }
327
328 let server = self.server_mut(server_name)?;
329 server.initialized = true;
330 }
331
332 Ok(())
333 }
334}