1use std::collections::BTreeMap;
2use std::io;
3use std::process::Stdio;
4
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::process::{Child, ChildStdin, ChildStdout, Command};
10
11use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
12use crate::mcp::mcp_tool_name;
13use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(untagged)]
17pub enum JsonRpcId {
18 Number(u64),
19 String(String),
20 Null,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub struct JsonRpcRequest<T = JsonValue> {
25 pub jsonrpc: String,
26 pub id: JsonRpcId,
27 pub method: String,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub params: Option<T>,
30}
31
32impl<T> JsonRpcRequest<T> {
33 #[must_use]
34 pub fn new(id: JsonRpcId, method: impl Into<String>, params: Option<T>) -> Self {
35 Self {
36 jsonrpc: "2.0".to_string(),
37 id,
38 method: method.into(),
39 params,
40 }
41 }
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct JsonRpcError {
46 pub code: i64,
47 pub message: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub data: Option<JsonValue>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53pub struct JsonRpcResponse<T = JsonValue> {
54 pub jsonrpc: String,
55 pub id: JsonRpcId,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub result: Option<T>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub error: Option<JsonRpcError>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(rename_all = "camelCase")]
64pub struct McpInitializeParams {
65 pub protocol_version: String,
66 pub capabilities: JsonValue,
67 pub client_info: McpInitializeClientInfo,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
71#[serde(rename_all = "camelCase")]
72pub struct McpInitializeClientInfo {
73 pub name: String,
74 pub version: String,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub struct McpInitializeResult {
80 pub protocol_version: String,
81 pub capabilities: JsonValue,
82 pub server_info: McpInitializeServerInfo,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(rename_all = "camelCase")]
87pub struct McpInitializeServerInfo {
88 pub name: String,
89 pub version: String,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
93#[serde(rename_all = "camelCase")]
94pub struct McpListToolsParams {
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub cursor: Option<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct McpTool {
101 pub name: String,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub description: Option<String>,
104 #[serde(rename = "inputSchema", skip_serializing_if = "Option::is_none")]
105 pub input_schema: Option<JsonValue>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub annotations: Option<JsonValue>,
108 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
109 pub meta: Option<JsonValue>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub struct McpListToolsResult {
115 pub tools: Vec<McpTool>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub next_cursor: Option<String>,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct McpToolCallParams {
123 pub name: String,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub arguments: Option<JsonValue>,
126 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
127 pub meta: Option<JsonValue>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
131pub struct McpToolCallContent {
132 #[serde(rename = "type")]
133 pub kind: String,
134 #[serde(flatten)]
135 pub data: BTreeMap<String, JsonValue>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
139#[serde(rename_all = "camelCase")]
140pub struct McpToolCallResult {
141 #[serde(default)]
142 pub content: Vec<McpToolCallContent>,
143 #[serde(default)]
144 pub structured_content: Option<JsonValue>,
145 #[serde(default)]
146 pub is_error: Option<bool>,
147 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
148 pub meta: Option<JsonValue>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
152#[serde(rename_all = "camelCase")]
153pub struct McpListResourcesParams {
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub cursor: Option<String>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
159pub struct McpResource {
160 pub uri: String,
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub name: Option<String>,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub description: Option<String>,
165 #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
166 pub mime_type: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub annotations: Option<JsonValue>,
169 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
170 pub meta: Option<JsonValue>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174#[serde(rename_all = "camelCase")]
175pub struct McpListResourcesResult {
176 pub resources: Vec<McpResource>,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub next_cursor: Option<String>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182#[serde(rename_all = "camelCase")]
183pub struct McpReadResourceParams {
184 pub uri: String,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct McpResourceContents {
189 pub uri: String,
190 #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
191 pub mime_type: Option<String>,
192 #[serde(skip_serializing_if = "Option::is_none")]
193 pub text: Option<String>,
194 #[serde(skip_serializing_if = "Option::is_none")]
195 pub blob: Option<String>,
196 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
197 pub meta: Option<JsonValue>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct McpReadResourceResult {
202 pub contents: Vec<McpResourceContents>,
203}
204
205#[derive(Debug, Clone, PartialEq)]
206pub struct ManagedMcpTool {
207 pub server_name: String,
208 pub qualified_name: String,
209 pub raw_name: String,
210 pub tool: McpTool,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct UnsupportedMcpServer {
215 pub server_name: String,
216 pub transport: McpTransport,
217 pub reason: String,
218}
219
220#[derive(Debug)]
221pub enum McpServerManagerError {
222 Io(io::Error),
223 JsonRpc {
224 server_name: String,
225 method: &'static str,
226 error: JsonRpcError,
227 },
228 InvalidResponse {
229 server_name: String,
230 method: &'static str,
231 details: String,
232 },
233 UnknownTool {
234 qualified_name: String,
235 },
236 UnknownServer {
237 server_name: String,
238 },
239}
240
241impl std::fmt::Display for McpServerManagerError {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 Self::Io(error) => write!(f, "{error}"),
245 Self::JsonRpc {
246 server_name,
247 method,
248 error,
249 } => write!(
250 f,
251 "MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})",
252 error.message, error.code
253 ),
254 Self::InvalidResponse {
255 server_name,
256 method,
257 details,
258 } => write!(
259 f,
260 "MCP server `{server_name}` returned invalid response for {method}: {details}"
261 ),
262 Self::UnknownTool { qualified_name } => {
263 write!(f, "unknown MCP tool `{qualified_name}`")
264 }
265 Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"),
266 }
267 }
268}
269
270impl std::error::Error for McpServerManagerError {
271 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
272 match self {
273 Self::Io(error) => Some(error),
274 Self::JsonRpc { .. }
275 | Self::InvalidResponse { .. }
276 | Self::UnknownTool { .. }
277 | Self::UnknownServer { .. } => None,
278 }
279 }
280}
281
282impl From<io::Error> for McpServerManagerError {
283 fn from(value: io::Error) -> Self {
284 Self::Io(value)
285 }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
289struct ToolRoute {
290 server_name: String,
291 raw_name: String,
292}
293
294#[derive(Debug)]
295struct ManagedMcpServer {
296 bootstrap: McpClientBootstrap,
297 process: Option<McpStdioProcess>,
298 initialized: bool,
299}
300
301impl ManagedMcpServer {
302 fn new(bootstrap: McpClientBootstrap) -> Self {
303 Self {
304 bootstrap,
305 process: None,
306 initialized: false,
307 }
308 }
309}
310
311#[derive(Debug)]
312pub struct McpServerManager {
313 servers: BTreeMap<String, ManagedMcpServer>,
314 unsupported_servers: Vec<UnsupportedMcpServer>,
315 tool_index: BTreeMap<String, ToolRoute>,
316 next_request_id: u64,
317}
318
319impl McpServerManager {
320 #[must_use]
321 pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
322 Self::from_servers(config.mcp().servers())
323 }
324
325 #[must_use]
326 pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
327 let mut managed_servers = BTreeMap::new();
328 let mut unsupported_servers = Vec::new();
329
330 for (server_name, server_config) in servers {
331 if server_config.transport() == McpTransport::Stdio {
332 let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config);
333 managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
334 } else {
335 unsupported_servers.push(UnsupportedMcpServer {
336 server_name: server_name.clone(),
337 transport: server_config.transport(),
338 reason: format!(
339 "transport {:?} is not supported by McpServerManager",
340 server_config.transport()
341 ),
342 });
343 }
344 }
345
346 Self {
347 servers: managed_servers,
348 unsupported_servers,
349 tool_index: BTreeMap::new(),
350 next_request_id: 1,
351 }
352 }
353
354 #[must_use]
355 pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
356 &self.unsupported_servers
357 }
358
359 pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
360 let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
361 let mut discovered_tools = Vec::new();
362
363 for server_name in server_names {
364 self.ensure_server_ready(&server_name).await?;
365 self.clear_routes_for_server(&server_name);
366
367 let mut cursor = None;
368 loop {
369 let request_id = self.take_request_id();
370 let response = {
371 let server = self.server_mut(&server_name)?;
372 let process = server.process.as_mut().ok_or_else(|| {
373 McpServerManagerError::InvalidResponse {
374 server_name: server_name.clone(),
375 method: "tools/list",
376 details: "server process missing after initialization".to_string(),
377 }
378 })?;
379 process
380 .list_tools(
381 request_id,
382 Some(McpListToolsParams {
383 cursor: cursor.clone(),
384 }),
385 )
386 .await?
387 };
388
389 if let Some(error) = response.error {
390 return Err(McpServerManagerError::JsonRpc {
391 server_name: server_name.clone(),
392 method: "tools/list",
393 error,
394 });
395 }
396
397 let result =
398 response
399 .result
400 .ok_or_else(|| McpServerManagerError::InvalidResponse {
401 server_name: server_name.clone(),
402 method: "tools/list",
403 details: "missing result payload".to_string(),
404 })?;
405
406 for tool in result.tools {
407 let qualified_name = mcp_tool_name(&server_name, &tool.name);
408 self.tool_index.insert(
409 qualified_name.clone(),
410 ToolRoute {
411 server_name: server_name.clone(),
412 raw_name: tool.name.clone(),
413 },
414 );
415 discovered_tools.push(ManagedMcpTool {
416 server_name: server_name.clone(),
417 qualified_name,
418 raw_name: tool.name.clone(),
419 tool,
420 });
421 }
422
423 match result.next_cursor {
424 Some(next_cursor) => cursor = Some(next_cursor),
425 None => break,
426 }
427 }
428 }
429
430 Ok(discovered_tools)
431 }
432
433 pub async fn call_tool(
434 &mut self,
435 qualified_tool_name: &str,
436 arguments: Option<JsonValue>,
437 ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
438 let route = self
439 .tool_index
440 .get(qualified_tool_name)
441 .cloned()
442 .ok_or_else(|| McpServerManagerError::UnknownTool {
443 qualified_name: qualified_tool_name.to_string(),
444 })?;
445
446 self.ensure_server_ready(&route.server_name).await?;
447 let request_id = self.take_request_id();
448 let response =
449 {
450 let server = self.server_mut(&route.server_name)?;
451 let process = server.process.as_mut().ok_or_else(|| {
452 McpServerManagerError::InvalidResponse {
453 server_name: route.server_name.clone(),
454 method: "tools/call",
455 details: "server process missing after initialization".to_string(),
456 }
457 })?;
458 process
459 .call_tool(
460 request_id,
461 McpToolCallParams {
462 name: route.raw_name,
463 arguments,
464 meta: None,
465 },
466 )
467 .await?
468 };
469 Ok(response)
470 }
471
472 pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
473 let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
474 for server_name in server_names {
475 let server = self.server_mut(&server_name)?;
476 if let Some(process) = server.process.as_mut() {
477 process.shutdown().await?;
478 }
479 server.process = None;
480 server.initialized = false;
481 }
482 Ok(())
483 }
484
485 fn clear_routes_for_server(&mut self, server_name: &str) {
486 self.tool_index
487 .retain(|_, route| route.server_name != server_name);
488 }
489
490 fn server_mut(
491 &mut self,
492 server_name: &str,
493 ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
494 self.servers
495 .get_mut(server_name)
496 .ok_or_else(|| McpServerManagerError::UnknownServer {
497 server_name: server_name.to_string(),
498 })
499 }
500
501 fn take_request_id(&mut self) -> JsonRpcId {
502 let id = self.next_request_id;
503 self.next_request_id = self.next_request_id.saturating_add(1);
504 JsonRpcId::Number(id)
505 }
506
507 async fn ensure_server_ready(
508 &mut self,
509 server_name: &str,
510 ) -> Result<(), McpServerManagerError> {
511 let needs_spawn = self
512 .servers
513 .get(server_name)
514 .map(|server| server.process.is_none())
515 .ok_or_else(|| McpServerManagerError::UnknownServer {
516 server_name: server_name.to_string(),
517 })?;
518
519 if needs_spawn {
520 let server = self.server_mut(server_name)?;
521 server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?);
522 server.initialized = false;
523 }
524
525 let needs_initialize = self
526 .servers
527 .get(server_name)
528 .map(|server| !server.initialized)
529 .ok_or_else(|| McpServerManagerError::UnknownServer {
530 server_name: server_name.to_string(),
531 })?;
532
533 if needs_initialize {
534 let request_id = self.take_request_id();
535 let response = {
536 let server = self.server_mut(server_name)?;
537 let process = server.process.as_mut().ok_or_else(|| {
538 McpServerManagerError::InvalidResponse {
539 server_name: server_name.to_string(),
540 method: "initialize",
541 details: "server process missing before initialize".to_string(),
542 }
543 })?;
544 process
545 .initialize(request_id, default_initialize_params())
546 .await?
547 };
548
549 if let Some(error) = response.error {
550 return Err(McpServerManagerError::JsonRpc {
551 server_name: server_name.to_string(),
552 method: "initialize",
553 error,
554 });
555 }
556
557 if response.result.is_none() {
558 return Err(McpServerManagerError::InvalidResponse {
559 server_name: server_name.to_string(),
560 method: "initialize",
561 details: "missing result payload".to_string(),
562 });
563 }
564
565 let server = self.server_mut(server_name)?;
566 server.initialized = true;
567 }
568
569 Ok(())
570 }
571}
572
573#[derive(Debug)]
574pub struct McpStdioProcess {
575 child: Child,
576 stdin: ChildStdin,
577 stdout: BufReader<ChildStdout>,
578}
579
580impl McpStdioProcess {
581 pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
582 let mut command = Command::new(&transport.command);
583 command
584 .args(&transport.args)
585 .stdin(Stdio::piped())
586 .stdout(Stdio::piped())
587 .stderr(Stdio::inherit());
588 apply_env(&mut command, &transport.env);
589
590 let mut child = command.spawn()?;
591 let stdin = child
592 .stdin
593 .take()
594 .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
595 let stdout = child
596 .stdout
597 .take()
598 .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
599
600 Ok(Self {
601 child,
602 stdin,
603 stdout: BufReader::new(stdout),
604 })
605 }
606
607 pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
608 self.stdin.write_all(bytes).await
609 }
610
611 pub async fn flush(&mut self) -> io::Result<()> {
612 self.stdin.flush().await
613 }
614
615 pub async fn write_line(&mut self, line: &str) -> io::Result<()> {
616 self.write_all(line.as_bytes()).await?;
617 self.write_all(b"\n").await?;
618 self.flush().await
619 }
620
621 pub async fn read_line(&mut self) -> io::Result<String> {
622 let mut line = String::new();
623 let bytes_read = self.stdout.read_line(&mut line).await?;
624 if bytes_read == 0 {
625 return Err(io::Error::new(
626 io::ErrorKind::UnexpectedEof,
627 "MCP stdio stream closed while reading line",
628 ));
629 }
630 Ok(line)
631 }
632
633 pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
634 let mut buffer = vec![0_u8; 4096];
635 let read = self.stdout.read(&mut buffer).await?;
636 buffer.truncate(read);
637 Ok(buffer)
638 }
639
640 pub async fn write_frame(&mut self, payload: &[u8]) -> io::Result<()> {
641 let encoded = encode_frame(payload);
642 self.write_all(&encoded).await?;
643 self.flush().await
644 }
645
646 pub async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
647 let mut content_length = None;
648 loop {
649 let mut line = String::new();
650 let bytes_read = self.stdout.read_line(&mut line).await?;
651 if bytes_read == 0 {
652 return Err(io::Error::new(
653 io::ErrorKind::UnexpectedEof,
654 "MCP stdio stream closed while reading headers",
655 ));
656 }
657 if line == "\r\n" {
658 break;
659 }
660 if let Some(value) = line.strip_prefix("Content-Length:") {
661 let parsed = value
662 .trim()
663 .parse::<usize>()
664 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
665 content_length = Some(parsed);
666 }
667 }
668
669 let content_length = content_length.ok_or_else(|| {
670 io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
671 })?;
672 let mut payload = vec![0_u8; content_length];
673 self.stdout.read_exact(&mut payload).await?;
674 Ok(payload)
675 }
676
677 pub async fn write_jsonrpc_message<T: Serialize>(&mut self, message: &T) -> io::Result<()> {
678 let body = serde_json::to_vec(message)
679 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
680 self.write_frame(&body).await
681 }
682
683 pub async fn read_jsonrpc_message<T: DeserializeOwned>(&mut self) -> io::Result<T> {
684 let payload = self.read_frame().await?;
685 serde_json::from_slice(&payload)
686 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
687 }
688
689 pub async fn send_request<T: Serialize>(
690 &mut self,
691 request: &JsonRpcRequest<T>,
692 ) -> io::Result<()> {
693 self.write_jsonrpc_message(request).await
694 }
695
696 pub async fn read_response<T: DeserializeOwned>(&mut self) -> io::Result<JsonRpcResponse<T>> {
697 self.read_jsonrpc_message().await
698 }
699
700 pub async fn request<TParams: Serialize, TResult: DeserializeOwned>(
701 &mut self,
702 id: JsonRpcId,
703 method: impl Into<String>,
704 params: Option<TParams>,
705 ) -> io::Result<JsonRpcResponse<TResult>> {
706 let request = JsonRpcRequest::new(id, method, params);
707 self.send_request(&request).await?;
708 self.read_response().await
709 }
710
711 pub async fn initialize(
712 &mut self,
713 id: JsonRpcId,
714 params: McpInitializeParams,
715 ) -> io::Result<JsonRpcResponse<McpInitializeResult>> {
716 self.request(id, "initialize", Some(params)).await
717 }
718
719 pub async fn list_tools(
720 &mut self,
721 id: JsonRpcId,
722 params: Option<McpListToolsParams>,
723 ) -> io::Result<JsonRpcResponse<McpListToolsResult>> {
724 self.request(id, "tools/list", params).await
725 }
726
727 pub async fn call_tool(
728 &mut self,
729 id: JsonRpcId,
730 params: McpToolCallParams,
731 ) -> io::Result<JsonRpcResponse<McpToolCallResult>> {
732 self.request(id, "tools/call", Some(params)).await
733 }
734
735 pub async fn list_resources(
736 &mut self,
737 id: JsonRpcId,
738 params: Option<McpListResourcesParams>,
739 ) -> io::Result<JsonRpcResponse<McpListResourcesResult>> {
740 self.request(id, "resources/list", params).await
741 }
742
743 pub async fn read_resource(
744 &mut self,
745 id: JsonRpcId,
746 params: McpReadResourceParams,
747 ) -> io::Result<JsonRpcResponse<McpReadResourceResult>> {
748 self.request(id, "resources/read", Some(params)).await
749 }
750
751 pub async fn terminate(&mut self) -> io::Result<()> {
752 self.child.kill().await
753 }
754
755 pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
756 self.child.wait().await
757 }
758
759 async fn shutdown(&mut self) -> io::Result<()> {
760 if self.child.try_wait()?.is_none() {
761 self.child.kill().await?;
762 }
763 let _ = self.child.wait().await?;
764 Ok(())
765 }
766}
767
768pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
769 match &bootstrap.transport {
770 McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
771 other => Err(io::Error::new(
772 io::ErrorKind::InvalidInput,
773 format!(
774 "MCP bootstrap transport for {} is not stdio: {other:?}",
775 bootstrap.server_name
776 ),
777 )),
778 }
779}
780
781fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
782 for (key, value) in env {
783 command.env(key, value);
784 }
785}
786
787fn encode_frame(payload: &[u8]) -> Vec<u8> {
788 let header = format!("Content-Length: {}\r\n\r\n", payload.len());
789 let mut framed = header.into_bytes();
790 framed.extend_from_slice(payload);
791 framed
792}
793
794fn default_initialize_params() -> McpInitializeParams {
795 McpInitializeParams {
796 protocol_version: "2025-03-26".to_string(),
797 capabilities: JsonValue::Object(serde_json::Map::new()),
798 client_info: McpInitializeClientInfo {
799 name: "runtime".to_string(),
800 version: env!("CARGO_PKG_VERSION").to_string(),
801 },
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use std::collections::BTreeMap;
808 use std::fs;
809 use std::io::ErrorKind;
810 use std::os::unix::fs::PermissionsExt;
811 use std::path::{Path, PathBuf};
812 use std::time::{SystemTime, UNIX_EPOCH};
813
814 use serde_json::json;
815 use tokio::runtime::Builder;
816
817 use crate::config::{
818 ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig,
819 McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig,
820 };
821 use crate::mcp::mcp_tool_name;
822 use crate::mcp_client::McpClientBootstrap;
823
824 use super::{
825 spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
826 McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
827 McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
828 McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
829 };
830
831 fn temp_dir() -> PathBuf {
832 let nanos = SystemTime::now()
833 .duration_since(UNIX_EPOCH)
834 .expect("time should be after epoch")
835 .as_nanos();
836 std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}"))
837 }
838
839 fn write_echo_script() -> PathBuf {
840 let root = temp_dir();
841 fs::create_dir_all(&root).expect("temp dir");
842 let script_path = root.join("echo-mcp.sh");
843 fs::write(
844 &script_path,
845 "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
846 )
847 .expect("write script");
848 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
849 permissions.set_mode(0o755);
850 fs::set_permissions(&script_path, permissions).expect("chmod");
851 script_path
852 }
853
854 fn write_jsonrpc_script() -> PathBuf {
855 let root = temp_dir();
856 fs::create_dir_all(&root).expect("temp dir");
857 let script_path = root.join("jsonrpc-mcp.py");
858 let script = [
859 "#!/usr/bin/env python3",
860 "import json, sys",
861 "header = b''",
862 r"while not header.endswith(b'\r\n\r\n'):",
863 " chunk = sys.stdin.buffer.read(1)",
864 " if not chunk:",
865 " raise SystemExit(1)",
866 " header += chunk",
867 "length = 0",
868 r"for line in header.decode().split('\r\n'):",
869 r" if line.lower().startswith('content-length:'):",
870 r" length = int(line.split(':', 1)[1].strip())",
871 "payload = sys.stdin.buffer.read(length)",
872 "request = json.loads(payload.decode())",
873 r"assert request['jsonrpc'] == '2.0'",
874 r"assert request['method'] == 'initialize'",
875 r"response = json.dumps({",
876 r" 'jsonrpc': '2.0',",
877 r" 'id': request['id'],",
878 r" 'result': {",
879 r" 'protocolVersion': request['params']['protocolVersion'],",
880 r" 'capabilities': {'tools': {}},",
881 r" 'serverInfo': {'name': 'fake-mcp', 'version': '0.1.0'}",
882 r" }",
883 r"}).encode()",
884 r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)",
885 "sys.stdout.buffer.flush()",
886 "",
887 ]
888 .join("\n");
889 fs::write(&script_path, script).expect("write script");
890 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
891 permissions.set_mode(0o755);
892 fs::set_permissions(&script_path, permissions).expect("chmod");
893 script_path
894 }
895
896 #[allow(clippy::too_many_lines)]
897 fn write_mcp_server_script() -> PathBuf {
898 let root = temp_dir();
899 fs::create_dir_all(&root).expect("temp dir");
900 let script_path = root.join("fake-mcp-server.py");
901 let script = [
902 "#!/usr/bin/env python3",
903 "import json, sys",
904 "",
905 "def read_message():",
906 " header = b''",
907 r" while not header.endswith(b'\r\n\r\n'):",
908 " chunk = sys.stdin.buffer.read(1)",
909 " if not chunk:",
910 " return None",
911 " header += chunk",
912 " length = 0",
913 r" for line in header.decode().split('\r\n'):",
914 r" if line.lower().startswith('content-length:'):",
915 r" length = int(line.split(':', 1)[1].strip())",
916 " payload = sys.stdin.buffer.read(length)",
917 " return json.loads(payload.decode())",
918 "",
919 "def send_message(message):",
920 " payload = json.dumps(message).encode()",
921 r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
922 " sys.stdout.buffer.flush()",
923 "",
924 "while True:",
925 " request = read_message()",
926 " if request is None:",
927 " break",
928 " method = request['method']",
929 " if method == 'initialize':",
930 " send_message({",
931 " 'jsonrpc': '2.0',",
932 " 'id': request['id'],",
933 " 'result': {",
934 " 'protocolVersion': request['params']['protocolVersion'],",
935 " 'capabilities': {'tools': {}, 'resources': {}},",
936 " 'serverInfo': {'name': 'fake-mcp', 'version': '0.2.0'}",
937 " }",
938 " })",
939 " elif method == 'tools/list':",
940 " send_message({",
941 " 'jsonrpc': '2.0',",
942 " 'id': request['id'],",
943 " 'result': {",
944 " 'tools': [",
945 " {",
946 " 'name': 'echo',",
947 " 'description': 'Echoes text',",
948 " 'inputSchema': {",
949 " 'type': 'object',",
950 " 'properties': {'text': {'type': 'string'}},",
951 " 'required': ['text']",
952 " }",
953 " }",
954 " ]",
955 " }",
956 " })",
957 " elif method == 'tools/call':",
958 " args = request['params'].get('arguments') or {}",
959 " if request['params']['name'] == 'fail':",
960 " send_message({",
961 " 'jsonrpc': '2.0',",
962 " 'id': request['id'],",
963 " 'error': {'code': -32001, 'message': 'tool failed'},",
964 " })",
965 " else:",
966 " text = args.get('text', '')",
967 " send_message({",
968 " 'jsonrpc': '2.0',",
969 " 'id': request['id'],",
970 " 'result': {",
971 " 'content': [{'type': 'text', 'text': f'echo:{text}'}],",
972 " 'structuredContent': {'echoed': text},",
973 " 'isError': False",
974 " }",
975 " })",
976 " elif method == 'resources/list':",
977 " send_message({",
978 " 'jsonrpc': '2.0',",
979 " 'id': request['id'],",
980 " 'result': {",
981 " 'resources': [",
982 " {",
983 " 'uri': 'file://guide.txt',",
984 " 'name': 'guide',",
985 " 'description': 'Guide text',",
986 " 'mimeType': 'text/plain'",
987 " }",
988 " ]",
989 " }",
990 " })",
991 " elif method == 'resources/read':",
992 " uri = request['params']['uri']",
993 " send_message({",
994 " 'jsonrpc': '2.0',",
995 " 'id': request['id'],",
996 " 'result': {",
997 " 'contents': [",
998 " {",
999 " 'uri': uri,",
1000 " 'mimeType': 'text/plain',",
1001 " 'text': f'contents for {uri}'",
1002 " }",
1003 " ]",
1004 " }",
1005 " })",
1006 " else:",
1007 " send_message({",
1008 " 'jsonrpc': '2.0',",
1009 " 'id': request['id'],",
1010 " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
1011 " })",
1012 "",
1013 ]
1014 .join("\n");
1015 fs::write(&script_path, script).expect("write script");
1016 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
1017 permissions.set_mode(0o755);
1018 fs::set_permissions(&script_path, permissions).expect("chmod");
1019 script_path
1020 }
1021
1022 #[allow(clippy::too_many_lines)]
1023 fn write_manager_mcp_server_script() -> PathBuf {
1024 let root = temp_dir();
1025 fs::create_dir_all(&root).expect("temp dir");
1026 let script_path = root.join("manager-mcp-server.py");
1027 let script = [
1028 "#!/usr/bin/env python3",
1029 "import json, os, sys",
1030 "",
1031 "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')",
1032 "LOG_PATH = os.environ.get('MCP_LOG_PATH')",
1033 "initialize_count = 0",
1034 "",
1035 "def log(method):",
1036 " if LOG_PATH:",
1037 " with open(LOG_PATH, 'a', encoding='utf-8') as handle:",
1038 " handle.write(f'{method}\\n')",
1039 "",
1040 "def read_message():",
1041 " header = b''",
1042 r" while not header.endswith(b'\r\n\r\n'):",
1043 " chunk = sys.stdin.buffer.read(1)",
1044 " if not chunk:",
1045 " return None",
1046 " header += chunk",
1047 " length = 0",
1048 r" for line in header.decode().split('\r\n'):",
1049 r" if line.lower().startswith('content-length:'):",
1050 r" length = int(line.split(':', 1)[1].strip())",
1051 " payload = sys.stdin.buffer.read(length)",
1052 " return json.loads(payload.decode())",
1053 "",
1054 "def send_message(message):",
1055 " payload = json.dumps(message).encode()",
1056 r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
1057 " sys.stdout.buffer.flush()",
1058 "",
1059 "while True:",
1060 " request = read_message()",
1061 " if request is None:",
1062 " break",
1063 " method = request['method']",
1064 " log(method)",
1065 " if method == 'initialize':",
1066 " initialize_count += 1",
1067 " send_message({",
1068 " 'jsonrpc': '2.0',",
1069 " 'id': request['id'],",
1070 " 'result': {",
1071 " 'protocolVersion': request['params']['protocolVersion'],",
1072 " 'capabilities': {'tools': {}},",
1073 " 'serverInfo': {'name': LABEL, 'version': '1.0.0'}",
1074 " }",
1075 " })",
1076 " elif method == 'tools/list':",
1077 " send_message({",
1078 " 'jsonrpc': '2.0',",
1079 " 'id': request['id'],",
1080 " 'result': {",
1081 " 'tools': [",
1082 " {",
1083 " 'name': 'echo',",
1084 " 'description': f'Echo tool for {LABEL}',",
1085 " 'inputSchema': {",
1086 " 'type': 'object',",
1087 " 'properties': {'text': {'type': 'string'}},",
1088 " 'required': ['text']",
1089 " }",
1090 " }",
1091 " ]",
1092 " }",
1093 " })",
1094 " elif method == 'tools/call':",
1095 " args = request['params'].get('arguments') or {}",
1096 " text = args.get('text', '')",
1097 " send_message({",
1098 " 'jsonrpc': '2.0',",
1099 " 'id': request['id'],",
1100 " 'result': {",
1101 " 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],",
1102 " 'structuredContent': {",
1103 " 'server': LABEL,",
1104 " 'echoed': text,",
1105 " 'initializeCount': initialize_count",
1106 " },",
1107 " 'isError': False",
1108 " }",
1109 " })",
1110 " else:",
1111 " send_message({",
1112 " 'jsonrpc': '2.0',",
1113 " 'id': request['id'],",
1114 " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
1115 " })",
1116 "",
1117 ]
1118 .join("\n");
1119 fs::write(&script_path, script).expect("write script");
1120 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
1121 permissions.set_mode(0o755);
1122 fs::set_permissions(&script_path, permissions).expect("chmod");
1123 script_path
1124 }
1125
1126 fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
1127 let config = ScopedMcpServerConfig {
1128 scope: ConfigSource::Local,
1129 config: McpServerConfig::Stdio(McpStdioServerConfig {
1130 command: "/bin/sh".to_string(),
1131 args: vec![script_path.to_string_lossy().into_owned()],
1132 env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
1133 }),
1134 };
1135 McpClientBootstrap::from_scoped_config("stdio server", &config)
1136 }
1137
1138 fn script_transport(script_path: &Path) -> crate::mcp_client::McpStdioTransport {
1139 crate::mcp_client::McpStdioTransport {
1140 command: "python3".to_string(),
1141 args: vec![script_path.to_string_lossy().into_owned()],
1142 env: BTreeMap::new(),
1143 }
1144 }
1145
1146 fn cleanup_script(script_path: &Path) {
1147 fs::remove_file(script_path).expect("cleanup script");
1148 fs::remove_dir_all(script_path.parent().expect("script parent")).expect("cleanup dir");
1149 }
1150
1151 fn manager_server_config(
1152 script_path: &Path,
1153 label: &str,
1154 log_path: &Path,
1155 ) -> ScopedMcpServerConfig {
1156 ScopedMcpServerConfig {
1157 scope: ConfigSource::Local,
1158 config: McpServerConfig::Stdio(McpStdioServerConfig {
1159 command: "python3".to_string(),
1160 args: vec![script_path.to_string_lossy().into_owned()],
1161 env: BTreeMap::from([
1162 ("MCP_SERVER_LABEL".to_string(), label.to_string()),
1163 (
1164 "MCP_LOG_PATH".to_string(),
1165 log_path.to_string_lossy().into_owned(),
1166 ),
1167 ]),
1168 }),
1169 }
1170 }
1171
1172 #[test]
1173 fn spawns_stdio_process_and_round_trips_io() {
1174 let runtime = Builder::new_current_thread()
1175 .enable_all()
1176 .build()
1177 .expect("runtime");
1178 runtime.block_on(async {
1179 let script_path = write_echo_script();
1180 let bootstrap = sample_bootstrap(&script_path);
1181 let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
1182
1183 let ready = process.read_line().await.expect("read ready");
1184 assert_eq!(ready, "READY:secret-value\n");
1185
1186 process
1187 .write_line("ping from client")
1188 .await
1189 .expect("write line");
1190
1191 let echoed = process.read_line().await.expect("read echo");
1192 assert_eq!(echoed, "ECHO:ping from client\n");
1193
1194 let status = process.wait().await.expect("wait for exit");
1195 assert!(status.success());
1196
1197 cleanup_script(&script_path);
1198 });
1199 }
1200
1201 #[test]
1202 fn rejects_non_stdio_bootstrap() {
1203 let config = ScopedMcpServerConfig {
1204 scope: ConfigSource::Local,
1205 config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
1206 name: "sdk-server".to_string(),
1207 }),
1208 };
1209 let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
1210 let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
1211 assert_eq!(error.kind(), ErrorKind::InvalidInput);
1212 }
1213
1214 #[test]
1215 fn round_trips_initialize_request_and_response_over_stdio_frames() {
1216 let runtime = Builder::new_current_thread()
1217 .enable_all()
1218 .build()
1219 .expect("runtime");
1220 runtime.block_on(async {
1221 let script_path = write_jsonrpc_script();
1222 let transport = script_transport(&script_path);
1223 let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1224
1225 let response = process
1226 .initialize(
1227 JsonRpcId::Number(1),
1228 McpInitializeParams {
1229 protocol_version: "2025-03-26".to_string(),
1230 capabilities: json!({"roots": {}}),
1231 client_info: McpInitializeClientInfo {
1232 name: "runtime-tests".to_string(),
1233 version: "0.1.0".to_string(),
1234 },
1235 },
1236 )
1237 .await
1238 .expect("initialize roundtrip");
1239
1240 assert_eq!(response.id, JsonRpcId::Number(1));
1241 assert_eq!(response.error, None);
1242 assert_eq!(
1243 response.result,
1244 Some(McpInitializeResult {
1245 protocol_version: "2025-03-26".to_string(),
1246 capabilities: json!({"tools": {}}),
1247 server_info: McpInitializeServerInfo {
1248 name: "fake-mcp".to_string(),
1249 version: "0.1.0".to_string(),
1250 },
1251 })
1252 );
1253
1254 let status = process.wait().await.expect("wait for exit");
1255 assert!(status.success());
1256
1257 cleanup_script(&script_path);
1258 });
1259 }
1260
1261 #[test]
1262 fn write_jsonrpc_request_emits_content_length_frame() {
1263 let runtime = Builder::new_current_thread()
1264 .enable_all()
1265 .build()
1266 .expect("runtime");
1267 runtime.block_on(async {
1268 let script_path = write_jsonrpc_script();
1269 let transport = script_transport(&script_path);
1270 let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1271 let request = JsonRpcRequest::new(
1272 JsonRpcId::Number(7),
1273 "initialize",
1274 Some(json!({
1275 "protocolVersion": "2025-03-26",
1276 "capabilities": {},
1277 "clientInfo": {"name": "runtime-tests", "version": "0.1.0"}
1278 })),
1279 );
1280
1281 process.send_request(&request).await.expect("send request");
1282 let response: JsonRpcResponse<serde_json::Value> =
1283 process.read_response().await.expect("read response");
1284
1285 assert_eq!(response.id, JsonRpcId::Number(7));
1286 assert_eq!(response.jsonrpc, "2.0");
1287
1288 let status = process.wait().await.expect("wait for exit");
1289 assert!(status.success());
1290
1291 cleanup_script(&script_path);
1292 });
1293 }
1294
1295 #[test]
1296 fn direct_spawn_uses_transport_env() {
1297 let runtime = Builder::new_current_thread()
1298 .enable_all()
1299 .build()
1300 .expect("runtime");
1301 runtime.block_on(async {
1302 let script_path = write_echo_script();
1303 let transport = crate::mcp_client::McpStdioTransport {
1304 command: "/bin/sh".to_string(),
1305 args: vec![script_path.to_string_lossy().into_owned()],
1306 env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
1307 };
1308 let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1309 let ready = process.read_available().await.expect("read ready");
1310 assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
1311 process.terminate().await.expect("terminate child");
1312 let _ = process.wait().await.expect("wait after kill");
1313
1314 cleanup_script(&script_path);
1315 });
1316 }
1317
1318 #[test]
1319 fn lists_tools_calls_tool_and_reads_resources_over_jsonrpc() {
1320 let runtime = Builder::new_current_thread()
1321 .enable_all()
1322 .build()
1323 .expect("runtime");
1324 runtime.block_on(async {
1325 let script_path = write_mcp_server_script();
1326 let transport = script_transport(&script_path);
1327 let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
1328
1329 let tools = process
1330 .list_tools(JsonRpcId::Number(2), None)
1331 .await
1332 .expect("list tools");
1333 assert_eq!(tools.error, None);
1334 assert_eq!(tools.id, JsonRpcId::Number(2));
1335 assert_eq!(
1336 tools.result,
1337 Some(McpListToolsResult {
1338 tools: vec![McpTool {
1339 name: "echo".to_string(),
1340 description: Some("Echoes text".to_string()),
1341 input_schema: Some(json!({
1342 "type": "object",
1343 "properties": {"text": {"type": "string"}},
1344 "required": ["text"]
1345 })),
1346 annotations: None,
1347 meta: None,
1348 }],
1349 next_cursor: None,
1350 })
1351 );
1352
1353 let call = process
1354 .call_tool(
1355 JsonRpcId::String("call-1".to_string()),
1356 McpToolCallParams {
1357 name: "echo".to_string(),
1358 arguments: Some(json!({"text": "hello"})),
1359 meta: None,
1360 },
1361 )
1362 .await
1363 .expect("call tool");
1364 assert_eq!(call.error, None);
1365 let call_result = call.result.expect("tool result");
1366 assert_eq!(call_result.is_error, Some(false));
1367 assert_eq!(
1368 call_result.structured_content,
1369 Some(json!({"echoed": "hello"}))
1370 );
1371 assert_eq!(call_result.content.len(), 1);
1372 assert_eq!(call_result.content[0].kind, "text");
1373 assert_eq!(
1374 call_result.content[0].data.get("text"),
1375 Some(&json!("echo:hello"))
1376 );
1377
1378 let resources = process
1379 .list_resources(JsonRpcId::Number(3), None)
1380 .await
1381 .expect("list resources");
1382 let resources_result = resources.result.expect("resources result");
1383 assert_eq!(resources_result.resources.len(), 1);
1384 assert_eq!(resources_result.resources[0].uri, "file://guide.txt");
1385 assert_eq!(
1386 resources_result.resources[0].mime_type.as_deref(),
1387 Some("text/plain")
1388 );
1389
1390 let read = process
1391 .read_resource(
1392 JsonRpcId::Number(4),
1393 McpReadResourceParams {
1394 uri: "file://guide.txt".to_string(),
1395 },
1396 )
1397 .await
1398 .expect("read resource");
1399 assert_eq!(
1400 read.result,
1401 Some(McpReadResourceResult {
1402 contents: vec![super::McpResourceContents {
1403 uri: "file://guide.txt".to_string(),
1404 mime_type: Some("text/plain".to_string()),
1405 text: Some("contents for file://guide.txt".to_string()),
1406 blob: None,
1407 meta: None,
1408 }],
1409 })
1410 );
1411
1412 process.terminate().await.expect("terminate child");
1413 let _ = process.wait().await.expect("wait after kill");
1414 cleanup_script(&script_path);
1415 });
1416 }
1417
1418 #[test]
1419 fn surfaces_jsonrpc_errors_from_tool_calls() {
1420 let runtime = Builder::new_current_thread()
1421 .enable_all()
1422 .build()
1423 .expect("runtime");
1424 runtime.block_on(async {
1425 let script_path = write_mcp_server_script();
1426 let transport = script_transport(&script_path);
1427 let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
1428
1429 let response = process
1430 .call_tool(
1431 JsonRpcId::Number(9),
1432 McpToolCallParams {
1433 name: "fail".to_string(),
1434 arguments: None,
1435 meta: None,
1436 },
1437 )
1438 .await
1439 .expect("call tool with error response");
1440
1441 assert_eq!(response.id, JsonRpcId::Number(9));
1442 assert!(response.result.is_none());
1443 assert_eq!(response.error.as_ref().map(|e| e.code), Some(-32001));
1444 assert_eq!(
1445 response.error.as_ref().map(|e| e.message.as_str()),
1446 Some("tool failed")
1447 );
1448
1449 process.terminate().await.expect("terminate child");
1450 let _ = process.wait().await.expect("wait after kill");
1451 cleanup_script(&script_path);
1452 });
1453 }
1454
1455 #[test]
1456 fn manager_discovers_tools_from_stdio_config() {
1457 let runtime = Builder::new_current_thread()
1458 .enable_all()
1459 .build()
1460 .expect("runtime");
1461 runtime.block_on(async {
1462 let script_path = write_manager_mcp_server_script();
1463 let root = script_path.parent().expect("script parent");
1464 let log_path = root.join("alpha.log");
1465 let servers = BTreeMap::from([(
1466 "alpha".to_string(),
1467 manager_server_config(&script_path, "alpha", &log_path),
1468 )]);
1469 let mut manager = McpServerManager::from_servers(&servers);
1470
1471 let tools = manager.discover_tools().await.expect("discover tools");
1472
1473 assert_eq!(tools.len(), 1);
1474 assert_eq!(tools[0].server_name, "alpha");
1475 assert_eq!(tools[0].raw_name, "echo");
1476 assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
1477 assert_eq!(tools[0].tool.name, "echo");
1478 assert!(manager.unsupported_servers().is_empty());
1479
1480 manager.shutdown().await.expect("shutdown");
1481 cleanup_script(&script_path);
1482 });
1483 }
1484
1485 #[test]
1486 fn manager_routes_tool_calls_to_correct_server() {
1487 let runtime = Builder::new_current_thread()
1488 .enable_all()
1489 .build()
1490 .expect("runtime");
1491 runtime.block_on(async {
1492 let script_path = write_manager_mcp_server_script();
1493 let root = script_path.parent().expect("script parent");
1494 let alpha_log = root.join("alpha.log");
1495 let beta_log = root.join("beta.log");
1496 let servers = BTreeMap::from([
1497 (
1498 "alpha".to_string(),
1499 manager_server_config(&script_path, "alpha", &alpha_log),
1500 ),
1501 (
1502 "beta".to_string(),
1503 manager_server_config(&script_path, "beta", &beta_log),
1504 ),
1505 ]);
1506 let mut manager = McpServerManager::from_servers(&servers);
1507
1508 let tools = manager.discover_tools().await.expect("discover tools");
1509 assert_eq!(tools.len(), 2);
1510
1511 let alpha = manager
1512 .call_tool(
1513 &mcp_tool_name("alpha", "echo"),
1514 Some(json!({"text": "hello"})),
1515 )
1516 .await
1517 .expect("call alpha tool");
1518 let beta = manager
1519 .call_tool(
1520 &mcp_tool_name("beta", "echo"),
1521 Some(json!({"text": "world"})),
1522 )
1523 .await
1524 .expect("call beta tool");
1525
1526 assert_eq!(
1527 alpha
1528 .result
1529 .as_ref()
1530 .and_then(|result| result.structured_content.as_ref())
1531 .and_then(|value| value.get("server")),
1532 Some(&json!("alpha"))
1533 );
1534 assert_eq!(
1535 beta.result
1536 .as_ref()
1537 .and_then(|result| result.structured_content.as_ref())
1538 .and_then(|value| value.get("server")),
1539 Some(&json!("beta"))
1540 );
1541
1542 manager.shutdown().await.expect("shutdown");
1543 cleanup_script(&script_path);
1544 });
1545 }
1546
1547 #[test]
1548 fn manager_records_unsupported_non_stdio_servers_without_panicking() {
1549 let servers = BTreeMap::from([
1550 (
1551 "http".to_string(),
1552 ScopedMcpServerConfig {
1553 scope: ConfigSource::Local,
1554 config: McpServerConfig::Http(McpRemoteServerConfig {
1555 url: "https://example.test/mcp".to_string(),
1556 headers: BTreeMap::new(),
1557 headers_helper: None,
1558 oauth: None,
1559 }),
1560 },
1561 ),
1562 (
1563 "sdk".to_string(),
1564 ScopedMcpServerConfig {
1565 scope: ConfigSource::Local,
1566 config: McpServerConfig::Sdk(McpSdkServerConfig {
1567 name: "sdk-server".to_string(),
1568 }),
1569 },
1570 ),
1571 (
1572 "ws".to_string(),
1573 ScopedMcpServerConfig {
1574 scope: ConfigSource::Local,
1575 config: McpServerConfig::Ws(McpWebSocketServerConfig {
1576 url: "wss://example.test/mcp".to_string(),
1577 headers: BTreeMap::new(),
1578 headers_helper: None,
1579 }),
1580 },
1581 ),
1582 ]);
1583
1584 let manager = McpServerManager::from_servers(&servers);
1585 let unsupported = manager.unsupported_servers();
1586
1587 assert_eq!(unsupported.len(), 3);
1588 assert_eq!(unsupported[0].server_name, "http");
1589 assert_eq!(unsupported[1].server_name, "sdk");
1590 assert_eq!(unsupported[2].server_name, "ws");
1591 }
1592
1593 #[test]
1594 fn manager_shutdown_terminates_spawned_children_and_is_idempotent() {
1595 let runtime = Builder::new_current_thread()
1596 .enable_all()
1597 .build()
1598 .expect("runtime");
1599 runtime.block_on(async {
1600 let script_path = write_manager_mcp_server_script();
1601 let root = script_path.parent().expect("script parent");
1602 let log_path = root.join("alpha.log");
1603 let servers = BTreeMap::from([(
1604 "alpha".to_string(),
1605 manager_server_config(&script_path, "alpha", &log_path),
1606 )]);
1607 let mut manager = McpServerManager::from_servers(&servers);
1608
1609 manager.discover_tools().await.expect("discover tools");
1610 manager.shutdown().await.expect("first shutdown");
1611 manager.shutdown().await.expect("second shutdown");
1612
1613 cleanup_script(&script_path);
1614 });
1615 }
1616
1617 #[test]
1618 fn manager_reuses_spawned_server_between_discovery_and_call() {
1619 let runtime = Builder::new_current_thread()
1620 .enable_all()
1621 .build()
1622 .expect("runtime");
1623 runtime.block_on(async {
1624 let script_path = write_manager_mcp_server_script();
1625 let root = script_path.parent().expect("script parent");
1626 let log_path = root.join("alpha.log");
1627 let servers = BTreeMap::from([(
1628 "alpha".to_string(),
1629 manager_server_config(&script_path, "alpha", &log_path),
1630 )]);
1631 let mut manager = McpServerManager::from_servers(&servers);
1632
1633 manager.discover_tools().await.expect("discover tools");
1634 let response = manager
1635 .call_tool(
1636 &mcp_tool_name("alpha", "echo"),
1637 Some(json!({"text": "reuse"})),
1638 )
1639 .await
1640 .expect("call tool");
1641
1642 assert_eq!(
1643 response
1644 .result
1645 .as_ref()
1646 .and_then(|result| result.structured_content.as_ref())
1647 .and_then(|value| value.get("initializeCount")),
1648 Some(&json!(1))
1649 );
1650
1651 let log = fs::read_to_string(&log_path).expect("read log");
1652 assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1);
1653 assert_eq!(
1654 log.lines().collect::<Vec<_>>(),
1655 vec!["initialize", "tools/list", "tools/call"]
1656 );
1657
1658 manager.shutdown().await.expect("shutdown");
1659 cleanup_script(&script_path);
1660 });
1661 }
1662
1663 #[test]
1664 fn manager_reports_unknown_qualified_tool_name() {
1665 let runtime = Builder::new_current_thread()
1666 .enable_all()
1667 .build()
1668 .expect("runtime");
1669 runtime.block_on(async {
1670 let script_path = write_manager_mcp_server_script();
1671 let root = script_path.parent().expect("script parent");
1672 let log_path = root.join("alpha.log");
1673 let servers = BTreeMap::from([(
1674 "alpha".to_string(),
1675 manager_server_config(&script_path, "alpha", &log_path),
1676 )]);
1677 let mut manager = McpServerManager::from_servers(&servers);
1678
1679 let error = manager
1680 .call_tool(
1681 &mcp_tool_name("alpha", "missing"),
1682 Some(json!({"text": "nope"})),
1683 )
1684 .await
1685 .expect_err("unknown qualified tool should fail");
1686
1687 match error {
1688 McpServerManagerError::UnknownTool { qualified_name } => {
1689 assert_eq!(qualified_name, mcp_tool_name("alpha", "missing"));
1690 }
1691 other => panic!("expected unknown tool error, got {other:?}"),
1692 }
1693
1694 cleanup_script(&script_path);
1695 });
1696 }
1697}