1use std::collections::BTreeMap;
2use std::io;
3use std::process::Stdio;
4
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::process::{Child, ChildStdin, ChildStdout, Command};
10
11use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
12use crate::mcp::mcp_tool_name;
13use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(untagged)]
17pub enum JsonRpcId {
18 Number(u64),
19 String(String),
20 Null,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub struct JsonRpcRequest<T = JsonValue> {
25 pub jsonrpc: String,
26 pub id: JsonRpcId,
27 pub method: String,
28 #[serde(skip_serializing_if = "Option::is_none")]
29 pub params: Option<T>,
30}
31
32impl<T> JsonRpcRequest<T> {
33 #[must_use]
34 pub fn new(id: JsonRpcId, method: impl Into<String>, params: Option<T>) -> Self {
35 Self {
36 jsonrpc: "2.0".to_string(),
37 id,
38 method: method.into(),
39 params,
40 }
41 }
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct JsonRpcError {
46 pub code: i64,
47 pub message: String,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 pub data: Option<JsonValue>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53pub struct JsonRpcResponse<T = JsonValue> {
54 pub jsonrpc: String,
55 pub id: JsonRpcId,
56 #[serde(skip_serializing_if = "Option::is_none")]
57 pub result: Option<T>,
58 #[serde(skip_serializing_if = "Option::is_none")]
59 pub error: Option<JsonRpcError>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(rename_all = "camelCase")]
64pub struct McpInitializeParams {
65 pub protocol_version: String,
66 pub capabilities: JsonValue,
67 pub client_info: McpInitializeClientInfo,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
71#[serde(rename_all = "camelCase")]
72pub struct McpInitializeClientInfo {
73 pub name: String,
74 pub version: String,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub struct McpInitializeResult {
80 pub protocol_version: String,
81 pub capabilities: JsonValue,
82 pub server_info: McpInitializeServerInfo,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(rename_all = "camelCase")]
87pub struct McpInitializeServerInfo {
88 pub name: String,
89 pub version: String,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
93#[serde(rename_all = "camelCase")]
94pub struct McpListToolsParams {
95 #[serde(skip_serializing_if = "Option::is_none")]
96 pub cursor: Option<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct McpTool {
101 pub name: String,
102 #[serde(skip_serializing_if = "Option::is_none")]
103 pub description: Option<String>,
104 #[serde(rename = "inputSchema", skip_serializing_if = "Option::is_none")]
105 pub input_schema: Option<JsonValue>,
106 #[serde(skip_serializing_if = "Option::is_none")]
107 pub annotations: Option<JsonValue>,
108 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
109 pub meta: Option<JsonValue>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub struct McpListToolsResult {
115 pub tools: Vec<McpTool>,
116 #[serde(skip_serializing_if = "Option::is_none")]
117 pub next_cursor: Option<String>,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct McpToolCallParams {
123 pub name: String,
124 #[serde(skip_serializing_if = "Option::is_none")]
125 pub arguments: Option<JsonValue>,
126 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
127 pub meta: Option<JsonValue>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
131pub struct McpToolCallContent {
132 #[serde(rename = "type")]
133 pub kind: String,
134 #[serde(flatten)]
135 pub data: BTreeMap<String, JsonValue>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
139#[serde(rename_all = "camelCase")]
140pub struct McpToolCallResult {
141 #[serde(default)]
142 pub content: Vec<McpToolCallContent>,
143 #[serde(default)]
144 pub structured_content: Option<JsonValue>,
145 #[serde(default)]
146 pub is_error: Option<bool>,
147 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
148 pub meta: Option<JsonValue>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
152#[serde(rename_all = "camelCase")]
153pub struct McpListResourcesParams {
154 #[serde(skip_serializing_if = "Option::is_none")]
155 pub cursor: Option<String>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
159pub struct McpResource {
160 pub uri: String,
161 #[serde(skip_serializing_if = "Option::is_none")]
162 pub name: Option<String>,
163 #[serde(skip_serializing_if = "Option::is_none")]
164 pub description: Option<String>,
165 #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
166 pub mime_type: Option<String>,
167 #[serde(skip_serializing_if = "Option::is_none")]
168 pub annotations: Option<JsonValue>,
169 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
170 pub meta: Option<JsonValue>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174#[serde(rename_all = "camelCase")]
175pub struct McpListResourcesResult {
176 pub resources: Vec<McpResource>,
177 #[serde(skip_serializing_if = "Option::is_none")]
178 pub next_cursor: Option<String>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182#[serde(rename_all = "camelCase")]
183pub struct McpReadResourceParams {
184 pub uri: String,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct McpResourceContents {
189 pub uri: String,
190 #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
191 pub mime_type: Option<String>,
192 #[serde(skip_serializing_if = "Option::is_none")]
193 pub text: Option<String>,
194 #[serde(skip_serializing_if = "Option::is_none")]
195 pub blob: Option<String>,
196 #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
197 pub meta: Option<JsonValue>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct McpReadResourceResult {
202 pub contents: Vec<McpResourceContents>,
203}
204
205#[derive(Debug, Clone, PartialEq)]
206pub struct ManagedMcpTool {
207 pub server_name: String,
208 pub qualified_name: String,
209 pub raw_name: String,
210 pub tool: McpTool,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct UnsupportedMcpServer {
215 pub server_name: String,
216 pub transport: McpTransport,
217 pub reason: String,
218}
219
220#[derive(Debug)]
221pub enum McpServerManagerError {
222 Io(io::Error),
223 JsonRpc {
224 server_name: String,
225 method: &'static str,
226 error: JsonRpcError,
227 },
228 InvalidResponse {
229 server_name: String,
230 method: &'static str,
231 details: String,
232 },
233 UnknownTool {
234 qualified_name: String,
235 },
236 UnknownServer {
237 server_name: String,
238 },
239}
240
241impl std::fmt::Display for McpServerManagerError {
242 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243 match self {
244 Self::Io(error) => write!(f, "{error}"),
245 Self::JsonRpc {
246 server_name,
247 method,
248 error,
249 } => write!(
250 f,
251 "MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})",
252 error.message, error.code
253 ),
254 Self::InvalidResponse {
255 server_name,
256 method,
257 details,
258 } => write!(
259 f,
260 "MCP server `{server_name}` returned invalid response for {method}: {details}"
261 ),
262 Self::UnknownTool { qualified_name } => {
263 write!(f, "unknown MCP tool `{qualified_name}`")
264 }
265 Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"),
266 }
267 }
268}
269
270impl std::error::Error for McpServerManagerError {
271 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
272 match self {
273 Self::Io(error) => Some(error),
274 Self::JsonRpc { .. }
275 | Self::InvalidResponse { .. }
276 | Self::UnknownTool { .. }
277 | Self::UnknownServer { .. } => None,
278 }
279 }
280}
281
282impl From<io::Error> for McpServerManagerError {
283 fn from(value: io::Error) -> Self {
284 Self::Io(value)
285 }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
289struct ToolRoute {
290 server_name: String,
291 raw_name: String,
292}
293
294#[derive(Debug)]
295struct ManagedMcpServer {
296 bootstrap: McpClientBootstrap,
297 process: Option<McpStdioProcess>,
298 initialized: bool,
299}
300
301impl ManagedMcpServer {
302 fn new(bootstrap: McpClientBootstrap) -> Self {
303 Self {
304 bootstrap,
305 process: None,
306 initialized: false,
307 }
308 }
309}
310
311#[derive(Debug)]
312pub struct McpServerManager {
313 servers: BTreeMap<String, ManagedMcpServer>,
314 unsupported_servers: Vec<UnsupportedMcpServer>,
315 tool_index: BTreeMap<String, ToolRoute>,
316 next_request_id: u64,
317}
318
319impl McpServerManager {
320 #[must_use]
321 pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
322 Self::from_servers(config.mcp().servers())
323 }
324
325 #[must_use]
326 pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
327 let mut managed_servers = BTreeMap::new();
328 let mut unsupported_servers = Vec::new();
329
330 for (server_name, server_config) in servers {
331 if server_config.transport() == McpTransport::Stdio {
332 let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config);
333 managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
334 } else {
335 unsupported_servers.push(UnsupportedMcpServer {
336 server_name: server_name.clone(),
337 transport: server_config.transport(),
338 reason: format!(
339 "transport {:?} is not supported by McpServerManager",
340 server_config.transport()
341 ),
342 });
343 }
344 }
345
346 Self {
347 servers: managed_servers,
348 unsupported_servers,
349 tool_index: BTreeMap::new(),
350 next_request_id: 1,
351 }
352 }
353
354 #[must_use]
355 pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
356 &self.unsupported_servers
357 }
358
359 pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
360 let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
361 let mut discovered_tools = Vec::new();
362
363 for server_name in server_names {
364 self.ensure_server_ready(&server_name).await?;
365 self.clear_routes_for_server(&server_name);
366
367 let mut cursor = None;
368 loop {
369 let request_id = self.take_request_id();
370 let response = {
371 let server = self.server_mut(&server_name)?;
372 let process = server.process.as_mut().ok_or_else(|| {
373 McpServerManagerError::InvalidResponse {
374 server_name: server_name.clone(),
375 method: "tools/list",
376 details: "server process missing after initialization".to_string(),
377 }
378 })?;
379 process
380 .list_tools(
381 request_id,
382 Some(McpListToolsParams {
383 cursor: cursor.clone(),
384 }),
385 )
386 .await?
387 };
388
389 if let Some(error) = response.error {
390 return Err(McpServerManagerError::JsonRpc {
391 server_name: server_name.clone(),
392 method: "tools/list",
393 error,
394 });
395 }
396
397 let result =
398 response
399 .result
400 .ok_or_else(|| McpServerManagerError::InvalidResponse {
401 server_name: server_name.clone(),
402 method: "tools/list",
403 details: "missing result payload".to_string(),
404 })?;
405
406 for tool in result.tools {
407 let qualified_name = mcp_tool_name(&server_name, &tool.name);
408 self.tool_index.insert(
409 qualified_name.clone(),
410 ToolRoute {
411 server_name: server_name.clone(),
412 raw_name: tool.name.clone(),
413 },
414 );
415 discovered_tools.push(ManagedMcpTool {
416 server_name: server_name.clone(),
417 qualified_name,
418 raw_name: tool.name.clone(),
419 tool,
420 });
421 }
422
423 match result.next_cursor {
424 Some(next_cursor) => cursor = Some(next_cursor),
425 None => break,
426 }
427 }
428 }
429
430 Ok(discovered_tools)
431 }
432
433 pub async fn call_tool(
434 &mut self,
435 qualified_tool_name: &str,
436 arguments: Option<JsonValue>,
437 ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
438 let route = self
439 .tool_index
440 .get(qualified_tool_name)
441 .cloned()
442 .ok_or_else(|| McpServerManagerError::UnknownTool {
443 qualified_name: qualified_tool_name.to_string(),
444 })?;
445
446 self.ensure_server_ready(&route.server_name).await?;
447 let request_id = self.take_request_id();
448 let response =
449 {
450 let server = self.server_mut(&route.server_name)?;
451 let process = server.process.as_mut().ok_or_else(|| {
452 McpServerManagerError::InvalidResponse {
453 server_name: route.server_name.clone(),
454 method: "tools/call",
455 details: "server process missing after initialization".to_string(),
456 }
457 })?;
458 process
459 .call_tool(
460 request_id,
461 McpToolCallParams {
462 name: route.raw_name,
463 arguments,
464 meta: None,
465 },
466 )
467 .await?
468 };
469 Ok(response)
470 }
471
472 pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
473 let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
474 for server_name in server_names {
475 let server = self.server_mut(&server_name)?;
476 if let Some(process) = server.process.as_mut() {
477 process.shutdown().await?;
478 }
479 server.process = None;
480 server.initialized = false;
481 }
482 Ok(())
483 }
484
485 fn clear_routes_for_server(&mut self, server_name: &str) {
486 self.tool_index
487 .retain(|_, route| route.server_name != server_name);
488 }
489
490 fn server_mut(
491 &mut self,
492 server_name: &str,
493 ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
494 self.servers
495 .get_mut(server_name)
496 .ok_or_else(|| McpServerManagerError::UnknownServer {
497 server_name: server_name.to_string(),
498 })
499 }
500
501 fn take_request_id(&mut self) -> JsonRpcId {
502 let id = self.next_request_id;
503 self.next_request_id = self.next_request_id.saturating_add(1);
504 JsonRpcId::Number(id)
505 }
506
507 async fn ensure_server_ready(
508 &mut self,
509 server_name: &str,
510 ) -> Result<(), McpServerManagerError> {
511 let needs_spawn = self
512 .servers
513 .get(server_name)
514 .map(|server| server.process.is_none())
515 .ok_or_else(|| McpServerManagerError::UnknownServer {
516 server_name: server_name.to_string(),
517 })?;
518
519 if needs_spawn {
520 let server = self.server_mut(server_name)?;
521 server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?);
522 server.initialized = false;
523 }
524
525 let needs_initialize = self
526 .servers
527 .get(server_name)
528 .map(|server| !server.initialized)
529 .ok_or_else(|| McpServerManagerError::UnknownServer {
530 server_name: server_name.to_string(),
531 })?;
532
533 if needs_initialize {
534 let request_id = self.take_request_id();
535 let response = {
536 let server = self.server_mut(server_name)?;
537 let process = server.process.as_mut().ok_or_else(|| {
538 McpServerManagerError::InvalidResponse {
539 server_name: server_name.to_string(),
540 method: "initialize",
541 details: "server process missing before initialize".to_string(),
542 }
543 })?;
544 process
545 .initialize(request_id, default_initialize_params())
546 .await?
547 };
548
549 if let Some(error) = response.error {
550 return Err(McpServerManagerError::JsonRpc {
551 server_name: server_name.to_string(),
552 method: "initialize",
553 error,
554 });
555 }
556
557 if response.result.is_none() {
558 return Err(McpServerManagerError::InvalidResponse {
559 server_name: server_name.to_string(),
560 method: "initialize",
561 details: "missing result payload".to_string(),
562 });
563 }
564
565 let server = self.server_mut(server_name)?;
566 server.initialized = true;
567 }
568
569 Ok(())
570 }
571}
572
573#[derive(Debug)]
574pub struct McpStdioProcess {
575 child: Child,
576 stdin: ChildStdin,
577 stdout: BufReader<ChildStdout>,
578}
579
580impl McpStdioProcess {
581 pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
582 let mut command = Command::new(&transport.command);
583 command
584 .args(&transport.args)
585 .stdin(Stdio::piped())
586 .stdout(Stdio::piped())
587 .stderr(Stdio::inherit());
588 apply_env(&mut command, &transport.env);
589
590 let mut child = command.spawn()?;
591 let stdin = child
592 .stdin
593 .take()
594 .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
595 let stdout = child
596 .stdout
597 .take()
598 .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
599
600 Ok(Self {
601 child,
602 stdin,
603 stdout: BufReader::new(stdout),
604 })
605 }
606
607 pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
608 self.stdin.write_all(bytes).await
609 }
610
611 pub async fn flush(&mut self) -> io::Result<()> {
612 self.stdin.flush().await
613 }
614
615 pub async fn write_line(&mut self, line: &str) -> io::Result<()> {
616 self.write_all(line.as_bytes()).await?;
617 self.write_all(b"\n").await?;
618 self.flush().await
619 }
620
621 pub async fn read_line(&mut self) -> io::Result<String> {
622 let mut line = String::new();
623 let bytes_read = self.stdout.read_line(&mut line).await?;
624 if bytes_read == 0 {
625 return Err(io::Error::new(
626 io::ErrorKind::UnexpectedEof,
627 "MCP stdio stream closed while reading line",
628 ));
629 }
630 Ok(line)
631 }
632
633 pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
634 let mut buffer = vec![0_u8; 4096];
635 let read = self.stdout.read(&mut buffer).await?;
636 buffer.truncate(read);
637 Ok(buffer)
638 }
639
640 pub async fn write_frame(&mut self, payload: &[u8]) -> io::Result<()> {
641 let encoded = encode_frame(payload);
642 self.write_all(&encoded).await?;
643 self.flush().await
644 }
645
646 pub async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
647 let mut content_length = None;
648 loop {
649 let mut line = String::new();
650 let bytes_read = self.stdout.read_line(&mut line).await?;
651 if bytes_read == 0 {
652 return Err(io::Error::new(
653 io::ErrorKind::UnexpectedEof,
654 "MCP stdio stream closed while reading headers",
655 ));
656 }
657 if line == "\r\n" {
658 break;
659 }
660 if let Some(value) = line.strip_prefix("Content-Length:") {
661 let parsed = value
662 .trim()
663 .parse::<usize>()
664 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
665 content_length = Some(parsed);
666 }
667 }
668
669 let content_length = content_length.ok_or_else(|| {
670 io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
671 })?;
672 let mut payload = vec![0_u8; content_length];
673 self.stdout.read_exact(&mut payload).await?;
674 Ok(payload)
675 }
676
677 pub async fn write_jsonrpc_message<T: Serialize>(&mut self, message: &T) -> io::Result<()> {
678 let body = serde_json::to_vec(message)
679 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
680 self.write_frame(&body).await
681 }
682
683 pub async fn read_jsonrpc_message<T: DeserializeOwned>(&mut self) -> io::Result<T> {
684 let payload = self.read_frame().await?;
685 serde_json::from_slice(&payload)
686 .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
687 }
688
689 pub async fn send_request<T: Serialize>(
690 &mut self,
691 request: &JsonRpcRequest<T>,
692 ) -> io::Result<()> {
693 self.write_jsonrpc_message(request).await
694 }
695
696 pub async fn read_response<T: DeserializeOwned>(&mut self) -> io::Result<JsonRpcResponse<T>> {
697 self.read_jsonrpc_message().await
698 }
699
700 pub async fn request<TParams: Serialize, TResult: DeserializeOwned>(
701 &mut self,
702 id: JsonRpcId,
703 method: impl Into<String>,
704 params: Option<TParams>,
705 ) -> io::Result<JsonRpcResponse<TResult>> {
706 let request = JsonRpcRequest::new(id, method, params);
707 self.send_request(&request).await?;
708 self.read_response().await
709 }
710
711 pub async fn initialize(
712 &mut self,
713 id: JsonRpcId,
714 params: McpInitializeParams,
715 ) -> io::Result<JsonRpcResponse<McpInitializeResult>> {
716 self.request(id, "initialize", Some(params)).await
717 }
718
719 pub async fn list_tools(
720 &mut self,
721 id: JsonRpcId,
722 params: Option<McpListToolsParams>,
723 ) -> io::Result<JsonRpcResponse<McpListToolsResult>> {
724 self.request(id, "tools/list", params).await
725 }
726
727 pub async fn call_tool(
728 &mut self,
729 id: JsonRpcId,
730 params: McpToolCallParams,
731 ) -> io::Result<JsonRpcResponse<McpToolCallResult>> {
732 self.request(id, "tools/call", Some(params)).await
733 }
734
735 pub async fn list_resources(
736 &mut self,
737 id: JsonRpcId,
738 params: Option<McpListResourcesParams>,
739 ) -> io::Result<JsonRpcResponse<McpListResourcesResult>> {
740 self.request(id, "resources/list", params).await
741 }
742
743 pub async fn read_resource(
744 &mut self,
745 id: JsonRpcId,
746 params: McpReadResourceParams,
747 ) -> io::Result<JsonRpcResponse<McpReadResourceResult>> {
748 self.request(id, "resources/read", Some(params)).await
749 }
750
751 pub async fn terminate(&mut self) -> io::Result<()> {
752 self.child.kill().await
753 }
754
755 pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
756 self.child.wait().await
757 }
758
759 async fn shutdown(&mut self) -> io::Result<()> {
760 if self.child.try_wait()?.is_none() {
761 self.child.kill().await?;
762 }
763 let _ = self.child.wait().await?;
764 Ok(())
765 }
766}
767
768pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
769 match &bootstrap.transport {
770 McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
771 other => Err(io::Error::new(
772 io::ErrorKind::InvalidInput,
773 format!(
774 "MCP bootstrap transport for {} is not stdio: {other:?}",
775 bootstrap.server_name
776 ),
777 )),
778 }
779}
780
781fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
782 for (key, value) in env {
783 command.env(key, value);
784 }
785}
786
787fn encode_frame(payload: &[u8]) -> Vec<u8> {
788 let header = format!("Content-Length: {}\r\n\r\n", payload.len());
789 let mut framed = header.into_bytes();
790 framed.extend_from_slice(payload);
791 framed
792}
793
794fn default_initialize_params() -> McpInitializeParams {
795 McpInitializeParams {
796 protocol_version: "2025-03-26".to_string(),
797 capabilities: JsonValue::Object(serde_json::Map::new()),
798 client_info: McpInitializeClientInfo {
799 name: "runtime".to_string(),
800 version: env!("CARGO_PKG_VERSION").to_string(),
801 },
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use std::collections::BTreeMap;
808 use std::fs;
809 use std::io::ErrorKind;
810 use std::os::unix::fs::PermissionsExt;
811 use std::path::{Path, PathBuf};
812 use std::process::Command;
813 use std::time::{SystemTime, UNIX_EPOCH};
814
815 use serde_json::json;
816 use tokio::runtime::Builder;
817
818 use crate::config::{
819 ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig,
820 McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig,
821 };
822 use crate::mcp::mcp_tool_name;
823 use crate::mcp_client::McpClientBootstrap;
824
825 use super::{
826 spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
827 McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
828 McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
829 McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
830 };
831
832 fn temp_dir() -> PathBuf {
833 let nanos = SystemTime::now()
834 .duration_since(UNIX_EPOCH)
835 .expect("time should be after epoch")
836 .as_nanos();
837 std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}"))
838 }
839
840 fn write_echo_script() -> PathBuf {
841 let root = temp_dir();
842 fs::create_dir_all(&root).expect("temp dir");
843 let script_path = root.join("echo-mcp.sh");
844 fs::write(
845 &script_path,
846 "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
847 )
848 .expect("write script");
849 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
850 permissions.set_mode(0o755);
851 fs::set_permissions(&script_path, permissions).expect("chmod");
852 script_path
853 }
854
855 fn write_jsonrpc_script() -> PathBuf {
856 let root = temp_dir();
857 fs::create_dir_all(&root).expect("temp dir");
858 let script_path = root.join("jsonrpc-mcp.py");
859 let script = [
860 "#!/usr/bin/env python3",
861 "import json, sys",
862 "header = b''",
863 r"while not header.endswith(b'\r\n\r\n'):",
864 " chunk = sys.stdin.buffer.read(1)",
865 " if not chunk:",
866 " raise SystemExit(1)",
867 " header += chunk",
868 "length = 0",
869 r"for line in header.decode().split('\r\n'):",
870 r" if line.lower().startswith('content-length:'):",
871 r" length = int(line.split(':', 1)[1].strip())",
872 "payload = sys.stdin.buffer.read(length)",
873 "request = json.loads(payload.decode())",
874 r"assert request['jsonrpc'] == '2.0'",
875 r"assert request['method'] == 'initialize'",
876 r"response = json.dumps({",
877 r" 'jsonrpc': '2.0',",
878 r" 'id': request['id'],",
879 r" 'result': {",
880 r" 'protocolVersion': request['params']['protocolVersion'],",
881 r" 'capabilities': {'tools': {}},",
882 r" 'serverInfo': {'name': 'fake-mcp', 'version': '0.1.0'}",
883 r" }",
884 r"}).encode()",
885 r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)",
886 "sys.stdout.buffer.flush()",
887 "",
888 ]
889 .join("\n");
890 fs::write(&script_path, script).expect("write script");
891 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
892 permissions.set_mode(0o755);
893 fs::set_permissions(&script_path, permissions).expect("chmod");
894 script_path
895 }
896
897 #[allow(clippy::too_many_lines)]
898 fn write_mcp_server_script() -> PathBuf {
899 let root = temp_dir();
900 fs::create_dir_all(&root).expect("temp dir");
901 let script_path = root.join("fake-mcp-server.py");
902 let script = [
903 "#!/usr/bin/env python3",
904 "import json, sys",
905 "",
906 "def read_message():",
907 " header = b''",
908 r" while not header.endswith(b'\r\n\r\n'):",
909 " chunk = sys.stdin.buffer.read(1)",
910 " if not chunk:",
911 " return None",
912 " header += chunk",
913 " length = 0",
914 r" for line in header.decode().split('\r\n'):",
915 r" if line.lower().startswith('content-length:'):",
916 r" length = int(line.split(':', 1)[1].strip())",
917 " payload = sys.stdin.buffer.read(length)",
918 " return json.loads(payload.decode())",
919 "",
920 "def send_message(message):",
921 " payload = json.dumps(message).encode()",
922 r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
923 " sys.stdout.buffer.flush()",
924 "",
925 "while True:",
926 " request = read_message()",
927 " if request is None:",
928 " break",
929 " method = request['method']",
930 " if method == 'initialize':",
931 " send_message({",
932 " 'jsonrpc': '2.0',",
933 " 'id': request['id'],",
934 " 'result': {",
935 " 'protocolVersion': request['params']['protocolVersion'],",
936 " 'capabilities': {'tools': {}, 'resources': {}},",
937 " 'serverInfo': {'name': 'fake-mcp', 'version': '0.2.0'}",
938 " }",
939 " })",
940 " elif method == 'tools/list':",
941 " send_message({",
942 " 'jsonrpc': '2.0',",
943 " 'id': request['id'],",
944 " 'result': {",
945 " 'tools': [",
946 " {",
947 " 'name': 'echo',",
948 " 'description': 'Echoes text',",
949 " 'inputSchema': {",
950 " 'type': 'object',",
951 " 'properties': {'text': {'type': 'string'}},",
952 " 'required': ['text']",
953 " }",
954 " }",
955 " ]",
956 " }",
957 " })",
958 " elif method == 'tools/call':",
959 " args = request['params'].get('arguments') or {}",
960 " if request['params']['name'] == 'fail':",
961 " send_message({",
962 " 'jsonrpc': '2.0',",
963 " 'id': request['id'],",
964 " 'error': {'code': -32001, 'message': 'tool failed'},",
965 " })",
966 " else:",
967 " text = args.get('text', '')",
968 " send_message({",
969 " 'jsonrpc': '2.0',",
970 " 'id': request['id'],",
971 " 'result': {",
972 " 'content': [{'type': 'text', 'text': f'echo:{text}'}],",
973 " 'structuredContent': {'echoed': text},",
974 " 'isError': False",
975 " }",
976 " })",
977 " elif method == 'resources/list':",
978 " send_message({",
979 " 'jsonrpc': '2.0',",
980 " 'id': request['id'],",
981 " 'result': {",
982 " 'resources': [",
983 " {",
984 " 'uri': 'file://guide.txt',",
985 " 'name': 'guide',",
986 " 'description': 'Guide text',",
987 " 'mimeType': 'text/plain'",
988 " }",
989 " ]",
990 " }",
991 " })",
992 " elif method == 'resources/read':",
993 " uri = request['params']['uri']",
994 " send_message({",
995 " 'jsonrpc': '2.0',",
996 " 'id': request['id'],",
997 " 'result': {",
998 " 'contents': [",
999 " {",
1000 " 'uri': uri,",
1001 " 'mimeType': 'text/plain',",
1002 " 'text': f'contents for {uri}'",
1003 " }",
1004 " ]",
1005 " }",
1006 " })",
1007 " else:",
1008 " send_message({",
1009 " 'jsonrpc': '2.0',",
1010 " 'id': request['id'],",
1011 " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
1012 " })",
1013 "",
1014 ]
1015 .join("\n");
1016 fs::write(&script_path, script).expect("write script");
1017 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
1018 permissions.set_mode(0o755);
1019 fs::set_permissions(&script_path, permissions).expect("chmod");
1020 script_path
1021 }
1022
1023 #[allow(clippy::too_many_lines)]
1024 fn write_manager_mcp_server_script() -> PathBuf {
1025 let root = temp_dir();
1026 fs::create_dir_all(&root).expect("temp dir");
1027 let script_path = root.join("manager-mcp-server.py");
1028 let script = [
1029 "#!/usr/bin/env python3",
1030 "import json, os, sys",
1031 "",
1032 "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')",
1033 "LOG_PATH = os.environ.get('MCP_LOG_PATH')",
1034 "initialize_count = 0",
1035 "",
1036 "def log(method):",
1037 " if LOG_PATH:",
1038 " with open(LOG_PATH, 'a', encoding='utf-8') as handle:",
1039 " handle.write(f'{method}\\n')",
1040 "",
1041 "def read_message():",
1042 " header = b''",
1043 r" while not header.endswith(b'\r\n\r\n'):",
1044 " chunk = sys.stdin.buffer.read(1)",
1045 " if not chunk:",
1046 " return None",
1047 " header += chunk",
1048 " length = 0",
1049 r" for line in header.decode().split('\r\n'):",
1050 r" if line.lower().startswith('content-length:'):",
1051 r" length = int(line.split(':', 1)[1].strip())",
1052 " payload = sys.stdin.buffer.read(length)",
1053 " return json.loads(payload.decode())",
1054 "",
1055 "def send_message(message):",
1056 " payload = json.dumps(message).encode()",
1057 r" sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
1058 " sys.stdout.buffer.flush()",
1059 "",
1060 "while True:",
1061 " request = read_message()",
1062 " if request is None:",
1063 " break",
1064 " method = request['method']",
1065 " log(method)",
1066 " if method == 'initialize':",
1067 " initialize_count += 1",
1068 " send_message({",
1069 " 'jsonrpc': '2.0',",
1070 " 'id': request['id'],",
1071 " 'result': {",
1072 " 'protocolVersion': request['params']['protocolVersion'],",
1073 " 'capabilities': {'tools': {}},",
1074 " 'serverInfo': {'name': LABEL, 'version': '1.0.0'}",
1075 " }",
1076 " })",
1077 " elif method == 'tools/list':",
1078 " send_message({",
1079 " 'jsonrpc': '2.0',",
1080 " 'id': request['id'],",
1081 " 'result': {",
1082 " 'tools': [",
1083 " {",
1084 " 'name': 'echo',",
1085 " 'description': f'Echo tool for {LABEL}',",
1086 " 'inputSchema': {",
1087 " 'type': 'object',",
1088 " 'properties': {'text': {'type': 'string'}},",
1089 " 'required': ['text']",
1090 " }",
1091 " }",
1092 " ]",
1093 " }",
1094 " })",
1095 " elif method == 'tools/call':",
1096 " args = request['params'].get('arguments') or {}",
1097 " text = args.get('text', '')",
1098 " send_message({",
1099 " 'jsonrpc': '2.0',",
1100 " 'id': request['id'],",
1101 " 'result': {",
1102 " 'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],",
1103 " 'structuredContent': {",
1104 " 'server': LABEL,",
1105 " 'echoed': text,",
1106 " 'initializeCount': initialize_count",
1107 " },",
1108 " 'isError': False",
1109 " }",
1110 " })",
1111 " else:",
1112 " send_message({",
1113 " 'jsonrpc': '2.0',",
1114 " 'id': request['id'],",
1115 " 'error': {'code': -32601, 'message': f'unknown method: {method}'},",
1116 " })",
1117 "",
1118 ]
1119 .join("\n");
1120 fs::write(&script_path, script).expect("write script");
1121 let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
1122 permissions.set_mode(0o755);
1123 fs::set_permissions(&script_path, permissions).expect("chmod");
1124 script_path
1125 }
1126
1127 fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
1128 let config = ScopedMcpServerConfig {
1129 scope: ConfigSource::Local,
1130 config: McpServerConfig::Stdio(McpStdioServerConfig {
1131 command: "/bin/sh".to_string(),
1132 args: vec![script_path.to_string_lossy().into_owned()],
1133 env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
1134 }),
1135 };
1136 McpClientBootstrap::from_scoped_config("stdio server", &config)
1137 }
1138
1139 fn script_transport(script_path: &Path) -> crate::mcp_client::McpStdioTransport {
1140 crate::mcp_client::McpStdioTransport {
1141 command: python_command(),
1142 args: vec![script_path.to_string_lossy().into_owned()],
1143 env: BTreeMap::new(),
1144 }
1145 }
1146
1147 fn python_command() -> String {
1148 for key in ["MCP_TEST_PYTHON", "PYTHON3", "PYTHON"] {
1149 if let Ok(value) = std::env::var(key) {
1150 if !value.trim().is_empty() {
1151 return value;
1152 }
1153 }
1154 }
1155
1156 for candidate in ["python3", "python"] {
1157 if Command::new(candidate).arg("--version").output().is_ok() {
1158 return candidate.to_string();
1159 }
1160 }
1161
1162 panic!("expected a Python interpreter for MCP stdio tests")
1163 }
1164
1165 fn cleanup_script(script_path: &Path) {
1166 if let Err(error) = fs::remove_file(script_path) {
1167 assert_eq!(error.kind(), std::io::ErrorKind::NotFound, "cleanup script");
1168 }
1169 if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) {
1170 assert_eq!(error.kind(), std::io::ErrorKind::NotFound, "cleanup dir");
1171 }
1172 }
1173
1174 fn manager_server_config(
1175 script_path: &Path,
1176 label: &str,
1177 log_path: &Path,
1178 ) -> ScopedMcpServerConfig {
1179 ScopedMcpServerConfig {
1180 scope: ConfigSource::Local,
1181 config: McpServerConfig::Stdio(McpStdioServerConfig {
1182 command: python_command(),
1183 args: vec![script_path.to_string_lossy().into_owned()],
1184 env: BTreeMap::from([
1185 ("MCP_SERVER_LABEL".to_string(), label.to_string()),
1186 (
1187 "MCP_LOG_PATH".to_string(),
1188 log_path.to_string_lossy().into_owned(),
1189 ),
1190 ]),
1191 }),
1192 }
1193 }
1194
1195 #[test]
1196 fn spawns_stdio_process_and_round_trips_io() {
1197 let runtime = Builder::new_current_thread()
1198 .enable_all()
1199 .build()
1200 .expect("runtime");
1201 runtime.block_on(async {
1202 let script_path = write_echo_script();
1203 let bootstrap = sample_bootstrap(&script_path);
1204 let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
1205
1206 let ready = process.read_line().await.expect("read ready");
1207 assert_eq!(ready, "READY:secret-value\n");
1208
1209 process
1210 .write_line("ping from client")
1211 .await
1212 .expect("write line");
1213
1214 let echoed = process.read_line().await.expect("read echo");
1215 assert_eq!(echoed, "ECHO:ping from client\n");
1216
1217 let status = process.wait().await.expect("wait for exit");
1218 assert!(status.success());
1219
1220 cleanup_script(&script_path);
1221 });
1222 }
1223
1224 #[test]
1225 fn rejects_non_stdio_bootstrap() {
1226 let config = ScopedMcpServerConfig {
1227 scope: ConfigSource::Local,
1228 config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
1229 name: "sdk-server".to_string(),
1230 }),
1231 };
1232 let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
1233 let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
1234 assert_eq!(error.kind(), ErrorKind::InvalidInput);
1235 }
1236
1237 #[test]
1238 fn round_trips_initialize_request_and_response_over_stdio_frames() {
1239 let runtime = Builder::new_current_thread()
1240 .enable_all()
1241 .build()
1242 .expect("runtime");
1243 runtime.block_on(async {
1244 let script_path = write_jsonrpc_script();
1245 let transport = script_transport(&script_path);
1246 let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1247
1248 let response = process
1249 .initialize(
1250 JsonRpcId::Number(1),
1251 McpInitializeParams {
1252 protocol_version: "2025-03-26".to_string(),
1253 capabilities: json!({"roots": {}}),
1254 client_info: McpInitializeClientInfo {
1255 name: "runtime-tests".to_string(),
1256 version: "0.1.0".to_string(),
1257 },
1258 },
1259 )
1260 .await
1261 .expect("initialize roundtrip");
1262
1263 assert_eq!(response.id, JsonRpcId::Number(1));
1264 assert_eq!(response.error, None);
1265 assert_eq!(
1266 response.result,
1267 Some(McpInitializeResult {
1268 protocol_version: "2025-03-26".to_string(),
1269 capabilities: json!({"tools": {}}),
1270 server_info: McpInitializeServerInfo {
1271 name: "fake-mcp".to_string(),
1272 version: "0.1.0".to_string(),
1273 },
1274 })
1275 );
1276
1277 let status = process.wait().await.expect("wait for exit");
1278 assert!(status.success());
1279
1280 cleanup_script(&script_path);
1281 });
1282 }
1283
1284 #[test]
1285 fn write_jsonrpc_request_emits_content_length_frame() {
1286 let runtime = Builder::new_current_thread()
1287 .enable_all()
1288 .build()
1289 .expect("runtime");
1290 runtime.block_on(async {
1291 let script_path = write_jsonrpc_script();
1292 let transport = script_transport(&script_path);
1293 let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1294 let request = JsonRpcRequest::new(
1295 JsonRpcId::Number(7),
1296 "initialize",
1297 Some(json!({
1298 "protocolVersion": "2025-03-26",
1299 "capabilities": {},
1300 "clientInfo": {"name": "runtime-tests", "version": "0.1.0"}
1301 })),
1302 );
1303
1304 process.send_request(&request).await.expect("send request");
1305 let response: JsonRpcResponse<serde_json::Value> =
1306 process.read_response().await.expect("read response");
1307
1308 assert_eq!(response.id, JsonRpcId::Number(7));
1309 assert_eq!(response.jsonrpc, "2.0");
1310
1311 let status = process.wait().await.expect("wait for exit");
1312 assert!(status.success());
1313
1314 cleanup_script(&script_path);
1315 });
1316 }
1317
1318 #[test]
1319 fn direct_spawn_uses_transport_env() {
1320 let runtime = Builder::new_current_thread()
1321 .enable_all()
1322 .build()
1323 .expect("runtime");
1324 runtime.block_on(async {
1325 let script_path = write_echo_script();
1326 let transport = crate::mcp_client::McpStdioTransport {
1327 command: "/bin/sh".to_string(),
1328 args: vec![script_path.to_string_lossy().into_owned()],
1329 env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
1330 };
1331 let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1332 let ready = process.read_available().await.expect("read ready");
1333 assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
1334 process.terminate().await.expect("terminate child");
1335 let _ = process.wait().await.expect("wait after kill");
1336
1337 cleanup_script(&script_path);
1338 });
1339 }
1340
1341 #[test]
1342 fn lists_tools_calls_tool_and_reads_resources_over_jsonrpc() {
1343 let runtime = Builder::new_current_thread()
1344 .enable_all()
1345 .build()
1346 .expect("runtime");
1347 runtime.block_on(async {
1348 let script_path = write_mcp_server_script();
1349 let transport = script_transport(&script_path);
1350 let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
1351
1352 let tools = process
1353 .list_tools(JsonRpcId::Number(2), None)
1354 .await
1355 .expect("list tools");
1356 assert_eq!(tools.error, None);
1357 assert_eq!(tools.id, JsonRpcId::Number(2));
1358 assert_eq!(
1359 tools.result,
1360 Some(McpListToolsResult {
1361 tools: vec![McpTool {
1362 name: "echo".to_string(),
1363 description: Some("Echoes text".to_string()),
1364 input_schema: Some(json!({
1365 "type": "object",
1366 "properties": {"text": {"type": "string"}},
1367 "required": ["text"]
1368 })),
1369 annotations: None,
1370 meta: None,
1371 }],
1372 next_cursor: None,
1373 })
1374 );
1375
1376 let call = process
1377 .call_tool(
1378 JsonRpcId::String("call-1".to_string()),
1379 McpToolCallParams {
1380 name: "echo".to_string(),
1381 arguments: Some(json!({"text": "hello"})),
1382 meta: None,
1383 },
1384 )
1385 .await
1386 .expect("call tool");
1387 assert_eq!(call.error, None);
1388 let call_result = call.result.expect("tool result");
1389 assert_eq!(call_result.is_error, Some(false));
1390 assert_eq!(
1391 call_result.structured_content,
1392 Some(json!({"echoed": "hello"}))
1393 );
1394 assert_eq!(call_result.content.len(), 1);
1395 assert_eq!(call_result.content[0].kind, "text");
1396 assert_eq!(
1397 call_result.content[0].data.get("text"),
1398 Some(&json!("echo:hello"))
1399 );
1400
1401 let resources = process
1402 .list_resources(JsonRpcId::Number(3), None)
1403 .await
1404 .expect("list resources");
1405 let resources_result = resources.result.expect("resources result");
1406 assert_eq!(resources_result.resources.len(), 1);
1407 assert_eq!(resources_result.resources[0].uri, "file://guide.txt");
1408 assert_eq!(
1409 resources_result.resources[0].mime_type.as_deref(),
1410 Some("text/plain")
1411 );
1412
1413 let read = process
1414 .read_resource(
1415 JsonRpcId::Number(4),
1416 McpReadResourceParams {
1417 uri: "file://guide.txt".to_string(),
1418 },
1419 )
1420 .await
1421 .expect("read resource");
1422 assert_eq!(
1423 read.result,
1424 Some(McpReadResourceResult {
1425 contents: vec![super::McpResourceContents {
1426 uri: "file://guide.txt".to_string(),
1427 mime_type: Some("text/plain".to_string()),
1428 text: Some("contents for file://guide.txt".to_string()),
1429 blob: None,
1430 meta: None,
1431 }],
1432 })
1433 );
1434
1435 process.terminate().await.expect("terminate child");
1436 let _ = process.wait().await.expect("wait after kill");
1437 cleanup_script(&script_path);
1438 });
1439 }
1440
1441 #[test]
1442 fn surfaces_jsonrpc_errors_from_tool_calls() {
1443 let runtime = Builder::new_current_thread()
1444 .enable_all()
1445 .build()
1446 .expect("runtime");
1447 runtime.block_on(async {
1448 let script_path = write_mcp_server_script();
1449 let transport = script_transport(&script_path);
1450 let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
1451
1452 let response = process
1453 .call_tool(
1454 JsonRpcId::Number(9),
1455 McpToolCallParams {
1456 name: "fail".to_string(),
1457 arguments: None,
1458 meta: None,
1459 },
1460 )
1461 .await
1462 .expect("call tool with error response");
1463
1464 assert_eq!(response.id, JsonRpcId::Number(9));
1465 assert!(response.result.is_none());
1466 assert_eq!(response.error.as_ref().map(|e| e.code), Some(-32001));
1467 assert_eq!(
1468 response.error.as_ref().map(|e| e.message.as_str()),
1469 Some("tool failed")
1470 );
1471
1472 process.terminate().await.expect("terminate child");
1473 let _ = process.wait().await.expect("wait after kill");
1474 cleanup_script(&script_path);
1475 });
1476 }
1477
1478 #[test]
1479 fn manager_discovers_tools_from_stdio_config() {
1480 let runtime = Builder::new_current_thread()
1481 .enable_all()
1482 .build()
1483 .expect("runtime");
1484 runtime.block_on(async {
1485 let script_path = write_manager_mcp_server_script();
1486 let root = script_path.parent().expect("script parent");
1487 let log_path = root.join("alpha.log");
1488 let servers = BTreeMap::from([(
1489 "alpha".to_string(),
1490 manager_server_config(&script_path, "alpha", &log_path),
1491 )]);
1492 let mut manager = McpServerManager::from_servers(&servers);
1493
1494 let tools = manager.discover_tools().await.expect("discover tools");
1495
1496 assert_eq!(tools.len(), 1);
1497 assert_eq!(tools[0].server_name, "alpha");
1498 assert_eq!(tools[0].raw_name, "echo");
1499 assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
1500 assert_eq!(tools[0].tool.name, "echo");
1501 assert!(manager.unsupported_servers().is_empty());
1502
1503 manager.shutdown().await.expect("shutdown");
1504 cleanup_script(&script_path);
1505 });
1506 }
1507
1508 #[test]
1509 fn manager_routes_tool_calls_to_correct_server() {
1510 let runtime = Builder::new_current_thread()
1511 .enable_all()
1512 .build()
1513 .expect("runtime");
1514 runtime.block_on(async {
1515 let script_path = write_manager_mcp_server_script();
1516 let root = script_path.parent().expect("script parent");
1517 let alpha_log = root.join("alpha.log");
1518 let beta_log = root.join("beta.log");
1519 let servers = BTreeMap::from([
1520 (
1521 "alpha".to_string(),
1522 manager_server_config(&script_path, "alpha", &alpha_log),
1523 ),
1524 (
1525 "beta".to_string(),
1526 manager_server_config(&script_path, "beta", &beta_log),
1527 ),
1528 ]);
1529 let mut manager = McpServerManager::from_servers(&servers);
1530
1531 let tools = manager.discover_tools().await.expect("discover tools");
1532 assert_eq!(tools.len(), 2);
1533
1534 let alpha = manager
1535 .call_tool(
1536 &mcp_tool_name("alpha", "echo"),
1537 Some(json!({"text": "hello"})),
1538 )
1539 .await
1540 .expect("call alpha tool");
1541 let beta = manager
1542 .call_tool(
1543 &mcp_tool_name("beta", "echo"),
1544 Some(json!({"text": "world"})),
1545 )
1546 .await
1547 .expect("call beta tool");
1548
1549 assert_eq!(
1550 alpha
1551 .result
1552 .as_ref()
1553 .and_then(|result| result.structured_content.as_ref())
1554 .and_then(|value| value.get("server")),
1555 Some(&json!("alpha"))
1556 );
1557 assert_eq!(
1558 beta.result
1559 .as_ref()
1560 .and_then(|result| result.structured_content.as_ref())
1561 .and_then(|value| value.get("server")),
1562 Some(&json!("beta"))
1563 );
1564
1565 manager.shutdown().await.expect("shutdown");
1566 cleanup_script(&script_path);
1567 });
1568 }
1569
1570 #[test]
1571 fn manager_records_unsupported_non_stdio_servers_without_panicking() {
1572 let servers = BTreeMap::from([
1573 (
1574 "http".to_string(),
1575 ScopedMcpServerConfig {
1576 scope: ConfigSource::Local,
1577 config: McpServerConfig::Http(McpRemoteServerConfig {
1578 url: "https://example.test/mcp".to_string(),
1579 headers: BTreeMap::new(),
1580 headers_helper: None,
1581 oauth: None,
1582 }),
1583 },
1584 ),
1585 (
1586 "sdk".to_string(),
1587 ScopedMcpServerConfig {
1588 scope: ConfigSource::Local,
1589 config: McpServerConfig::Sdk(McpSdkServerConfig {
1590 name: "sdk-server".to_string(),
1591 }),
1592 },
1593 ),
1594 (
1595 "ws".to_string(),
1596 ScopedMcpServerConfig {
1597 scope: ConfigSource::Local,
1598 config: McpServerConfig::Ws(McpWebSocketServerConfig {
1599 url: "wss://example.test/mcp".to_string(),
1600 headers: BTreeMap::new(),
1601 headers_helper: None,
1602 }),
1603 },
1604 ),
1605 ]);
1606
1607 let manager = McpServerManager::from_servers(&servers);
1608 let unsupported = manager.unsupported_servers();
1609
1610 assert_eq!(unsupported.len(), 3);
1611 assert_eq!(unsupported[0].server_name, "http");
1612 assert_eq!(unsupported[1].server_name, "sdk");
1613 assert_eq!(unsupported[2].server_name, "ws");
1614 }
1615
1616 #[test]
1617 fn manager_shutdown_terminates_spawned_children_and_is_idempotent() {
1618 let runtime = Builder::new_current_thread()
1619 .enable_all()
1620 .build()
1621 .expect("runtime");
1622 runtime.block_on(async {
1623 let script_path = write_manager_mcp_server_script();
1624 let root = script_path.parent().expect("script parent");
1625 let log_path = root.join("alpha.log");
1626 let servers = BTreeMap::from([(
1627 "alpha".to_string(),
1628 manager_server_config(&script_path, "alpha", &log_path),
1629 )]);
1630 let mut manager = McpServerManager::from_servers(&servers);
1631
1632 manager.discover_tools().await.expect("discover tools");
1633 manager.shutdown().await.expect("first shutdown");
1634 manager.shutdown().await.expect("second shutdown");
1635
1636 cleanup_script(&script_path);
1637 });
1638 }
1639
1640 #[test]
1641 fn manager_reuses_spawned_server_between_discovery_and_call() {
1642 let runtime = Builder::new_current_thread()
1643 .enable_all()
1644 .build()
1645 .expect("runtime");
1646 runtime.block_on(async {
1647 let script_path = write_manager_mcp_server_script();
1648 let root = script_path.parent().expect("script parent");
1649 let log_path = root.join("alpha.log");
1650 let servers = BTreeMap::from([(
1651 "alpha".to_string(),
1652 manager_server_config(&script_path, "alpha", &log_path),
1653 )]);
1654 let mut manager = McpServerManager::from_servers(&servers);
1655
1656 manager.discover_tools().await.expect("discover tools");
1657 let response = manager
1658 .call_tool(
1659 &mcp_tool_name("alpha", "echo"),
1660 Some(json!({"text": "reuse"})),
1661 )
1662 .await
1663 .expect("call tool");
1664
1665 assert_eq!(
1666 response
1667 .result
1668 .as_ref()
1669 .and_then(|result| result.structured_content.as_ref())
1670 .and_then(|value| value.get("initializeCount")),
1671 Some(&json!(1))
1672 );
1673
1674 let log = fs::read_to_string(&log_path).expect("read log");
1675 assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1);
1676 assert_eq!(
1677 log.lines().collect::<Vec<_>>(),
1678 vec!["initialize", "tools/list", "tools/call"]
1679 );
1680
1681 manager.shutdown().await.expect("shutdown");
1682 cleanup_script(&script_path);
1683 });
1684 }
1685
1686 #[test]
1687 fn manager_reports_unknown_qualified_tool_name() {
1688 let runtime = Builder::new_current_thread()
1689 .enable_all()
1690 .build()
1691 .expect("runtime");
1692 runtime.block_on(async {
1693 let script_path = write_manager_mcp_server_script();
1694 let root = script_path.parent().expect("script parent");
1695 let log_path = root.join("alpha.log");
1696 let servers = BTreeMap::from([(
1697 "alpha".to_string(),
1698 manager_server_config(&script_path, "alpha", &log_path),
1699 )]);
1700 let mut manager = McpServerManager::from_servers(&servers);
1701
1702 let error = manager
1703 .call_tool(
1704 &mcp_tool_name("alpha", "missing"),
1705 Some(json!({"text": "nope"})),
1706 )
1707 .await
1708 .expect_err("unknown qualified tool should fail");
1709
1710 match error {
1711 McpServerManagerError::UnknownTool { qualified_name } => {
1712 assert_eq!(qualified_name, mcp_tool_name("alpha", "missing"));
1713 }
1714 other => panic!("expected unknown tool error, got {other:?}"),
1715 }
1716
1717 cleanup_script(&script_path);
1718 });
1719 }
1720}