Skip to main content

codineer_runtime/mcp_stdio/
manager.rs

1use std::collections::BTreeMap;
2
3use serde_json::Value as JsonValue;
4
5use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
6use crate::mcp::mcp_tool_name;
7use crate::mcp_client::{McpClientBootstrap, McpClientTransport};
8use crate::mcp_remote::McpRemoteClient;
9
10use super::process::{default_initialize_params, spawn_mcp_stdio_process, McpStdioProcess};
11use super::types::{
12    JsonRpcId, JsonRpcResponse, ManagedMcpTool, McpListToolsParams, McpServerManagerError,
13    McpToolCallParams, McpToolCallResult, UnsupportedMcpServer,
14};
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub(crate) struct ToolRoute {
18    pub(crate) server_name: String,
19    pub(crate) raw_name: String,
20}
21
22#[derive(Debug)]
23pub(crate) enum McpServerProcess {
24    Stdio(Box<McpStdioProcess>),
25    Remote(Box<McpRemoteClient>),
26}
27
28#[derive(Debug)]
29pub(crate) struct ManagedMcpServer {
30    pub(crate) bootstrap: McpClientBootstrap,
31    pub(crate) process: Option<McpServerProcess>,
32    pub(crate) initialized: bool,
33}
34
35impl ManagedMcpServer {
36    pub(crate) fn new(bootstrap: McpClientBootstrap) -> Self {
37        Self {
38            bootstrap,
39            process: None,
40            initialized: false,
41        }
42    }
43}
44
45#[derive(Debug)]
46pub struct McpServerManager {
47    servers: BTreeMap<String, ManagedMcpServer>,
48    unsupported_servers: Vec<UnsupportedMcpServer>,
49    tool_index: BTreeMap<String, ToolRoute>,
50    next_request_id: u64,
51}
52
53impl McpServerManager {
54    #[must_use]
55    pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
56        Self::from_servers(config.mcp().servers())
57    }
58
59    #[must_use]
60    pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
61        let mut managed_servers = BTreeMap::new();
62        let mut unsupported_servers = Vec::new();
63
64        for (server_name, server_config) in servers {
65            let transport = server_config.transport();
66            match transport {
67                McpTransport::Stdio | McpTransport::Sse | McpTransport::Http | McpTransport::Ws => {
68                    let bootstrap =
69                        McpClientBootstrap::from_scoped_config(server_name, server_config);
70                    managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
71                }
72                McpTransport::Sdk | McpTransport::ManagedProxy => {
73                    unsupported_servers.push(UnsupportedMcpServer {
74                        server_name: server_name.clone(),
75                        transport,
76                        reason: format!(
77                            "transport {transport:?} is not supported by McpServerManager"
78                        ),
79                    });
80                }
81            }
82        }
83
84        Self {
85            servers: managed_servers,
86            unsupported_servers,
87            tool_index: BTreeMap::new(),
88            next_request_id: 1,
89        }
90    }
91
92    #[must_use]
93    pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
94        &self.unsupported_servers
95    }
96
97    pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
98        let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
99        let mut discovered_tools = Vec::new();
100
101        for server_name in server_names {
102            self.ensure_server_ready(&server_name).await?;
103            self.clear_routes_for_server(&server_name);
104
105            let mut cursor = None;
106            loop {
107                let request_id = self.take_request_id();
108                let response = {
109                    let server = self.server_mut(&server_name)?;
110                    let process = server.process.as_mut().ok_or_else(|| {
111                        McpServerManagerError::InvalidResponse {
112                            server_name: server_name.clone(),
113                            method: "tools/list",
114                            details: "server process missing after initialization".to_string(),
115                        }
116                    })?;
117                    let params = Some(McpListToolsParams {
118                        cursor: cursor.clone(),
119                    });
120                    match process {
121                        McpServerProcess::Stdio(p) => p.list_tools(request_id, params).await?,
122                        McpServerProcess::Remote(c) => c.list_tools(request_id, params).await?,
123                    }
124                };
125
126                if let Some(error) = response.error {
127                    return Err(McpServerManagerError::JsonRpc {
128                        server_name: server_name.clone(),
129                        method: "tools/list",
130                        error,
131                    });
132                }
133
134                let result =
135                    response
136                        .result
137                        .ok_or_else(|| McpServerManagerError::InvalidResponse {
138                            server_name: server_name.clone(),
139                            method: "tools/list",
140                            details: "missing result payload".to_string(),
141                        })?;
142
143                for tool in result.tools {
144                    let qualified_name = mcp_tool_name(&server_name, &tool.name);
145                    self.tool_index.insert(
146                        qualified_name.clone(),
147                        ToolRoute {
148                            server_name: server_name.clone(),
149                            raw_name: tool.name.clone(),
150                        },
151                    );
152                    discovered_tools.push(ManagedMcpTool {
153                        server_name: server_name.clone(),
154                        qualified_name,
155                        raw_name: tool.name.clone(),
156                        tool,
157                    });
158                }
159
160                match result.next_cursor {
161                    Some(next_cursor) => cursor = Some(next_cursor),
162                    None => break,
163                }
164            }
165        }
166
167        Ok(discovered_tools)
168    }
169
170    pub async fn call_tool(
171        &mut self,
172        qualified_tool_name: &str,
173        arguments: Option<JsonValue>,
174    ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
175        let route = self
176            .tool_index
177            .get(qualified_tool_name)
178            .cloned()
179            .ok_or_else(|| McpServerManagerError::UnknownTool {
180                qualified_name: qualified_tool_name.to_string(),
181            })?;
182
183        self.ensure_server_ready(&route.server_name).await?;
184        let request_id = self.take_request_id();
185        let params = McpToolCallParams {
186            name: route.raw_name,
187            arguments,
188            meta: None,
189        };
190        let response =
191            {
192                let server = self.server_mut(&route.server_name)?;
193                let process = server.process.as_mut().ok_or_else(|| {
194                    McpServerManagerError::InvalidResponse {
195                        server_name: route.server_name.clone(),
196                        method: "tools/call",
197                        details: "server process missing after initialization".to_string(),
198                    }
199                })?;
200                match process {
201                    McpServerProcess::Stdio(p) => p.call_tool(request_id, params).await?,
202                    McpServerProcess::Remote(c) => c.call_tool(request_id, params).await?,
203                }
204            };
205        Ok(response)
206    }
207
208    pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
209        let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
210        for server_name in server_names {
211            let server = self.server_mut(&server_name)?;
212            if let Some(process) = server.process.as_mut() {
213                match process {
214                    McpServerProcess::Stdio(p) => p.shutdown().await?,
215                    McpServerProcess::Remote(c) => c.shutdown().await?,
216                }
217            }
218            server.process = None;
219            server.initialized = false;
220        }
221        Ok(())
222    }
223
224    fn clear_routes_for_server(&mut self, server_name: &str) {
225        self.tool_index
226            .retain(|_, route| route.server_name != server_name);
227    }
228
229    fn server_mut(
230        &mut self,
231        server_name: &str,
232    ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
233        self.servers
234            .get_mut(server_name)
235            .ok_or_else(|| McpServerManagerError::UnknownServer {
236                server_name: server_name.to_string(),
237            })
238    }
239
240    fn take_request_id(&mut self) -> JsonRpcId {
241        let id = self.next_request_id;
242        self.next_request_id = self.next_request_id.saturating_add(1);
243        JsonRpcId::Number(id)
244    }
245
246    async fn ensure_server_ready(
247        &mut self,
248        server_name: &str,
249    ) -> Result<(), McpServerManagerError> {
250        let needs_spawn = self
251            .servers
252            .get(server_name)
253            .map(|server| server.process.is_none())
254            .ok_or_else(|| McpServerManagerError::UnknownServer {
255                server_name: server_name.to_string(),
256            })?;
257
258        if needs_spawn {
259            let server = self.server_mut(server_name)?;
260            let process = match &server.bootstrap.transport {
261                McpClientTransport::Stdio(_) => {
262                    McpServerProcess::Stdio(Box::new(spawn_mcp_stdio_process(&server.bootstrap)?))
263                }
264                McpClientTransport::Sse(_)
265                | McpClientTransport::Http(_)
266                | McpClientTransport::WebSocket(_) => McpServerProcess::Remote(Box::new(
267                    McpRemoteClient::connect(&server.bootstrap)
268                        .await
269                        .map_err(|e| McpServerManagerError::SpawnFailed {
270                            server_name: server_name.to_string(),
271                            source: e,
272                        })?,
273                )),
274                other => {
275                    return Err(McpServerManagerError::InvalidResponse {
276                        server_name: server_name.to_string(),
277                        method: "connect",
278                        details: format!("transport {other:?} not supported"),
279                    });
280                }
281            };
282            server.process = Some(process);
283            server.initialized = false;
284        }
285
286        let needs_initialize = self
287            .servers
288            .get(server_name)
289            .map(|server| !server.initialized)
290            .ok_or_else(|| McpServerManagerError::UnknownServer {
291                server_name: server_name.to_string(),
292            })?;
293
294        if needs_initialize {
295            let request_id = self.take_request_id();
296            let params = default_initialize_params();
297            let response = {
298                let server = self.server_mut(server_name)?;
299                let process = server.process.as_mut().ok_or_else(|| {
300                    McpServerManagerError::InvalidResponse {
301                        server_name: server_name.to_string(),
302                        method: "initialize",
303                        details: "server process missing before initialize".to_string(),
304                    }
305                })?;
306                match process {
307                    McpServerProcess::Stdio(p) => p.initialize(request_id, params).await?,
308                    McpServerProcess::Remote(c) => c.initialize(request_id, params).await?,
309                }
310            };
311
312            if let Some(error) = response.error {
313                return Err(McpServerManagerError::JsonRpc {
314                    server_name: server_name.to_string(),
315                    method: "initialize",
316                    error,
317                });
318            }
319
320            if response.result.is_none() {
321                return Err(McpServerManagerError::InvalidResponse {
322                    server_name: server_name.to_string(),
323                    method: "initialize",
324                    details: "missing result payload".to_string(),
325                });
326            }
327
328            let server = self.server_mut(server_name)?;
329            server.initialized = true;
330        }
331
332        Ok(())
333    }
334}