Skip to main content

wraith_runtime/
mcp_stdio.rs

1use std::collections::BTreeMap;
2use std::io;
3use std::process::Stdio;
4
5use serde::de::DeserializeOwned;
6use serde::{Deserialize, Serialize};
7use serde_json::Value as JsonValue;
8use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
9use tokio::process::{Child, ChildStdin, ChildStdout, Command};
10
11use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
12use crate::mcp::mcp_tool_name;
13use crate::mcp_client::{McpClientBootstrap, McpClientTransport, McpStdioTransport};
14
15#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
16#[serde(untagged)]
17pub enum JsonRpcId {
18    Number(u64),
19    String(String),
20    Null,
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
24pub struct JsonRpcRequest<T = JsonValue> {
25    pub jsonrpc: String,
26    pub id: JsonRpcId,
27    pub method: String,
28    #[serde(skip_serializing_if = "Option::is_none")]
29    pub params: Option<T>,
30}
31
32impl<T> JsonRpcRequest<T> {
33    #[must_use]
34    pub fn new(id: JsonRpcId, method: impl Into<String>, params: Option<T>) -> Self {
35        Self {
36            jsonrpc: "2.0".to_string(),
37            id,
38            method: method.into(),
39            params,
40        }
41    }
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
45pub struct JsonRpcError {
46    pub code: i64,
47    pub message: String,
48    #[serde(skip_serializing_if = "Option::is_none")]
49    pub data: Option<JsonValue>,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
53pub struct JsonRpcResponse<T = JsonValue> {
54    pub jsonrpc: String,
55    pub id: JsonRpcId,
56    #[serde(skip_serializing_if = "Option::is_none")]
57    pub result: Option<T>,
58    #[serde(skip_serializing_if = "Option::is_none")]
59    pub error: Option<JsonRpcError>,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
63#[serde(rename_all = "camelCase")]
64pub struct McpInitializeParams {
65    pub protocol_version: String,
66    pub capabilities: JsonValue,
67    pub client_info: McpInitializeClientInfo,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
71#[serde(rename_all = "camelCase")]
72pub struct McpInitializeClientInfo {
73    pub name: String,
74    pub version: String,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78#[serde(rename_all = "camelCase")]
79pub struct McpInitializeResult {
80    pub protocol_version: String,
81    pub capabilities: JsonValue,
82    pub server_info: McpInitializeServerInfo,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(rename_all = "camelCase")]
87pub struct McpInitializeServerInfo {
88    pub name: String,
89    pub version: String,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
93#[serde(rename_all = "camelCase")]
94pub struct McpListToolsParams {
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub cursor: Option<String>,
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
100pub struct McpTool {
101    pub name: String,
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub description: Option<String>,
104    #[serde(rename = "inputSchema", skip_serializing_if = "Option::is_none")]
105    pub input_schema: Option<JsonValue>,
106    #[serde(skip_serializing_if = "Option::is_none")]
107    pub annotations: Option<JsonValue>,
108    #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
109    pub meta: Option<JsonValue>,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
113#[serde(rename_all = "camelCase")]
114pub struct McpListToolsResult {
115    pub tools: Vec<McpTool>,
116    #[serde(skip_serializing_if = "Option::is_none")]
117    pub next_cursor: Option<String>,
118}
119
120#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
121#[serde(rename_all = "camelCase")]
122pub struct McpToolCallParams {
123    pub name: String,
124    #[serde(skip_serializing_if = "Option::is_none")]
125    pub arguments: Option<JsonValue>,
126    #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
127    pub meta: Option<JsonValue>,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
131pub struct McpToolCallContent {
132    #[serde(rename = "type")]
133    pub kind: String,
134    #[serde(flatten)]
135    pub data: BTreeMap<String, JsonValue>,
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
139#[serde(rename_all = "camelCase")]
140pub struct McpToolCallResult {
141    #[serde(default)]
142    pub content: Vec<McpToolCallContent>,
143    #[serde(default)]
144    pub structured_content: Option<JsonValue>,
145    #[serde(default)]
146    pub is_error: Option<bool>,
147    #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
148    pub meta: Option<JsonValue>,
149}
150
151#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
152#[serde(rename_all = "camelCase")]
153pub struct McpListResourcesParams {
154    #[serde(skip_serializing_if = "Option::is_none")]
155    pub cursor: Option<String>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
159pub struct McpResource {
160    pub uri: String,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    pub name: Option<String>,
163    #[serde(skip_serializing_if = "Option::is_none")]
164    pub description: Option<String>,
165    #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
166    pub mime_type: Option<String>,
167    #[serde(skip_serializing_if = "Option::is_none")]
168    pub annotations: Option<JsonValue>,
169    #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
170    pub meta: Option<JsonValue>,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
174#[serde(rename_all = "camelCase")]
175pub struct McpListResourcesResult {
176    pub resources: Vec<McpResource>,
177    #[serde(skip_serializing_if = "Option::is_none")]
178    pub next_cursor: Option<String>,
179}
180
181#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
182#[serde(rename_all = "camelCase")]
183pub struct McpReadResourceParams {
184    pub uri: String,
185}
186
187#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
188pub struct McpResourceContents {
189    pub uri: String,
190    #[serde(rename = "mimeType", skip_serializing_if = "Option::is_none")]
191    pub mime_type: Option<String>,
192    #[serde(skip_serializing_if = "Option::is_none")]
193    pub text: Option<String>,
194    #[serde(skip_serializing_if = "Option::is_none")]
195    pub blob: Option<String>,
196    #[serde(rename = "_meta", skip_serializing_if = "Option::is_none")]
197    pub meta: Option<JsonValue>,
198}
199
200#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
201pub struct McpReadResourceResult {
202    pub contents: Vec<McpResourceContents>,
203}
204
205#[derive(Debug, Clone, PartialEq)]
206pub struct ManagedMcpTool {
207    pub server_name: String,
208    pub qualified_name: String,
209    pub raw_name: String,
210    pub tool: McpTool,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
214pub struct UnsupportedMcpServer {
215    pub server_name: String,
216    pub transport: McpTransport,
217    pub reason: String,
218}
219
220#[derive(Debug)]
221pub enum McpServerManagerError {
222    Io(io::Error),
223    JsonRpc {
224        server_name: String,
225        method: &'static str,
226        error: JsonRpcError,
227    },
228    InvalidResponse {
229        server_name: String,
230        method: &'static str,
231        details: String,
232    },
233    UnknownTool {
234        qualified_name: String,
235    },
236    UnknownServer {
237        server_name: String,
238    },
239}
240
241impl std::fmt::Display for McpServerManagerError {
242    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
243        match self {
244            Self::Io(error) => write!(f, "{error}"),
245            Self::JsonRpc {
246                server_name,
247                method,
248                error,
249            } => write!(
250                f,
251                "MCP server `{server_name}` returned JSON-RPC error for {method}: {} ({})",
252                error.message, error.code
253            ),
254            Self::InvalidResponse {
255                server_name,
256                method,
257                details,
258            } => write!(
259                f,
260                "MCP server `{server_name}` returned invalid response for {method}: {details}"
261            ),
262            Self::UnknownTool { qualified_name } => {
263                write!(f, "unknown MCP tool `{qualified_name}`")
264            }
265            Self::UnknownServer { server_name } => write!(f, "unknown MCP server `{server_name}`"),
266        }
267    }
268}
269
270impl std::error::Error for McpServerManagerError {
271    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
272        match self {
273            Self::Io(error) => Some(error),
274            Self::JsonRpc { .. }
275            | Self::InvalidResponse { .. }
276            | Self::UnknownTool { .. }
277            | Self::UnknownServer { .. } => None,
278        }
279    }
280}
281
282impl From<io::Error> for McpServerManagerError {
283    fn from(value: io::Error) -> Self {
284        Self::Io(value)
285    }
286}
287
288#[derive(Debug, Clone, PartialEq, Eq)]
289struct ToolRoute {
290    server_name: String,
291    raw_name: String,
292}
293
294#[derive(Debug)]
295struct ManagedMcpServer {
296    bootstrap: McpClientBootstrap,
297    process: Option<McpStdioProcess>,
298    initialized: bool,
299}
300
301impl ManagedMcpServer {
302    fn new(bootstrap: McpClientBootstrap) -> Self {
303        Self {
304            bootstrap,
305            process: None,
306            initialized: false,
307        }
308    }
309}
310
311#[derive(Debug)]
312pub struct McpServerManager {
313    servers: BTreeMap<String, ManagedMcpServer>,
314    unsupported_servers: Vec<UnsupportedMcpServer>,
315    tool_index: BTreeMap<String, ToolRoute>,
316    next_request_id: u64,
317}
318
319impl McpServerManager {
320    #[must_use]
321    pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
322        Self::from_servers(config.mcp().servers())
323    }
324
325    #[must_use]
326    pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
327        let mut managed_servers = BTreeMap::new();
328        let mut unsupported_servers = Vec::new();
329
330        for (server_name, server_config) in servers {
331            if server_config.transport() == McpTransport::Stdio {
332                let bootstrap = McpClientBootstrap::from_scoped_config(server_name, server_config);
333                managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
334            } else {
335                unsupported_servers.push(UnsupportedMcpServer {
336                    server_name: server_name.clone(),
337                    transport: server_config.transport(),
338                    reason: format!(
339                        "transport {:?} is not supported by McpServerManager",
340                        server_config.transport()
341                    ),
342                });
343            }
344        }
345
346        Self {
347            servers: managed_servers,
348            unsupported_servers,
349            tool_index: BTreeMap::new(),
350            next_request_id: 1,
351        }
352    }
353
354    #[must_use]
355    pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
356        &self.unsupported_servers
357    }
358
359    pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
360        let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
361        let mut discovered_tools = Vec::new();
362
363        for server_name in server_names {
364            self.ensure_server_ready(&server_name).await?;
365            self.clear_routes_for_server(&server_name);
366
367            let mut cursor = None;
368            loop {
369                let request_id = self.take_request_id();
370                let response = {
371                    let server = self.server_mut(&server_name)?;
372                    let process = server.process.as_mut().ok_or_else(|| {
373                        McpServerManagerError::InvalidResponse {
374                            server_name: server_name.clone(),
375                            method: "tools/list",
376                            details: "server process missing after initialization".to_string(),
377                        }
378                    })?;
379                    process
380                        .list_tools(
381                            request_id,
382                            Some(McpListToolsParams {
383                                cursor: cursor.clone(),
384                            }),
385                        )
386                        .await?
387                };
388
389                if let Some(error) = response.error {
390                    return Err(McpServerManagerError::JsonRpc {
391                        server_name: server_name.clone(),
392                        method: "tools/list",
393                        error,
394                    });
395                }
396
397                let result =
398                    response
399                        .result
400                        .ok_or_else(|| McpServerManagerError::InvalidResponse {
401                            server_name: server_name.clone(),
402                            method: "tools/list",
403                            details: "missing result payload".to_string(),
404                        })?;
405
406                for tool in result.tools {
407                    let qualified_name = mcp_tool_name(&server_name, &tool.name);
408                    self.tool_index.insert(
409                        qualified_name.clone(),
410                        ToolRoute {
411                            server_name: server_name.clone(),
412                            raw_name: tool.name.clone(),
413                        },
414                    );
415                    discovered_tools.push(ManagedMcpTool {
416                        server_name: server_name.clone(),
417                        qualified_name,
418                        raw_name: tool.name.clone(),
419                        tool,
420                    });
421                }
422
423                match result.next_cursor {
424                    Some(next_cursor) => cursor = Some(next_cursor),
425                    None => break,
426                }
427            }
428        }
429
430        Ok(discovered_tools)
431    }
432
433    pub async fn call_tool(
434        &mut self,
435        qualified_tool_name: &str,
436        arguments: Option<JsonValue>,
437    ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
438        let route = self
439            .tool_index
440            .get(qualified_tool_name)
441            .cloned()
442            .ok_or_else(|| McpServerManagerError::UnknownTool {
443                qualified_name: qualified_tool_name.to_string(),
444            })?;
445
446        self.ensure_server_ready(&route.server_name).await?;
447        let request_id = self.take_request_id();
448        let response =
449            {
450                let server = self.server_mut(&route.server_name)?;
451                let process = server.process.as_mut().ok_or_else(|| {
452                    McpServerManagerError::InvalidResponse {
453                        server_name: route.server_name.clone(),
454                        method: "tools/call",
455                        details: "server process missing after initialization".to_string(),
456                    }
457                })?;
458                process
459                    .call_tool(
460                        request_id,
461                        McpToolCallParams {
462                            name: route.raw_name,
463                            arguments,
464                            meta: None,
465                        },
466                    )
467                    .await?
468            };
469        Ok(response)
470    }
471
472    pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
473        let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
474        for server_name in server_names {
475            let server = self.server_mut(&server_name)?;
476            if let Some(process) = server.process.as_mut() {
477                process.shutdown().await?;
478            }
479            server.process = None;
480            server.initialized = false;
481        }
482        Ok(())
483    }
484
485    fn clear_routes_for_server(&mut self, server_name: &str) {
486        self.tool_index
487            .retain(|_, route| route.server_name != server_name);
488    }
489
490    fn server_mut(
491        &mut self,
492        server_name: &str,
493    ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
494        self.servers
495            .get_mut(server_name)
496            .ok_or_else(|| McpServerManagerError::UnknownServer {
497                server_name: server_name.to_string(),
498            })
499    }
500
501    fn take_request_id(&mut self) -> JsonRpcId {
502        let id = self.next_request_id;
503        self.next_request_id = self.next_request_id.saturating_add(1);
504        JsonRpcId::Number(id)
505    }
506
507    async fn ensure_server_ready(
508        &mut self,
509        server_name: &str,
510    ) -> Result<(), McpServerManagerError> {
511        let needs_spawn = self
512            .servers
513            .get(server_name)
514            .map(|server| server.process.is_none())
515            .ok_or_else(|| McpServerManagerError::UnknownServer {
516                server_name: server_name.to_string(),
517            })?;
518
519        if needs_spawn {
520            let server = self.server_mut(server_name)?;
521            server.process = Some(spawn_mcp_stdio_process(&server.bootstrap)?);
522            server.initialized = false;
523        }
524
525        let needs_initialize = self
526            .servers
527            .get(server_name)
528            .map(|server| !server.initialized)
529            .ok_or_else(|| McpServerManagerError::UnknownServer {
530                server_name: server_name.to_string(),
531            })?;
532
533        if needs_initialize {
534            let request_id = self.take_request_id();
535            let response = {
536                let server = self.server_mut(server_name)?;
537                let process = server.process.as_mut().ok_or_else(|| {
538                    McpServerManagerError::InvalidResponse {
539                        server_name: server_name.to_string(),
540                        method: "initialize",
541                        details: "server process missing before initialize".to_string(),
542                    }
543                })?;
544                process
545                    .initialize(request_id, default_initialize_params())
546                    .await?
547            };
548
549            if let Some(error) = response.error {
550                return Err(McpServerManagerError::JsonRpc {
551                    server_name: server_name.to_string(),
552                    method: "initialize",
553                    error,
554                });
555            }
556
557            if response.result.is_none() {
558                return Err(McpServerManagerError::InvalidResponse {
559                    server_name: server_name.to_string(),
560                    method: "initialize",
561                    details: "missing result payload".to_string(),
562                });
563            }
564
565            let server = self.server_mut(server_name)?;
566            server.initialized = true;
567        }
568
569        Ok(())
570    }
571}
572
573#[derive(Debug)]
574pub struct McpStdioProcess {
575    child: Child,
576    stdin: ChildStdin,
577    stdout: BufReader<ChildStdout>,
578}
579
580impl McpStdioProcess {
581    pub fn spawn(transport: &McpStdioTransport) -> io::Result<Self> {
582        let mut command = Command::new(&transport.command);
583        command
584            .args(&transport.args)
585            .stdin(Stdio::piped())
586            .stdout(Stdio::piped())
587            .stderr(Stdio::inherit());
588        apply_env(&mut command, &transport.env);
589
590        let mut child = command.spawn()?;
591        let stdin = child
592            .stdin
593            .take()
594            .ok_or_else(|| io::Error::other("stdio MCP process missing stdin pipe"))?;
595        let stdout = child
596            .stdout
597            .take()
598            .ok_or_else(|| io::Error::other("stdio MCP process missing stdout pipe"))?;
599
600        Ok(Self {
601            child,
602            stdin,
603            stdout: BufReader::new(stdout),
604        })
605    }
606
607    pub async fn write_all(&mut self, bytes: &[u8]) -> io::Result<()> {
608        self.stdin.write_all(bytes).await
609    }
610
611    pub async fn flush(&mut self) -> io::Result<()> {
612        self.stdin.flush().await
613    }
614
615    pub async fn write_line(&mut self, line: &str) -> io::Result<()> {
616        self.write_all(line.as_bytes()).await?;
617        self.write_all(b"\n").await?;
618        self.flush().await
619    }
620
621    pub async fn read_line(&mut self) -> io::Result<String> {
622        let mut line = String::new();
623        let bytes_read = self.stdout.read_line(&mut line).await?;
624        if bytes_read == 0 {
625            return Err(io::Error::new(
626                io::ErrorKind::UnexpectedEof,
627                "MCP stdio stream closed while reading line",
628            ));
629        }
630        Ok(line)
631    }
632
633    pub async fn read_available(&mut self) -> io::Result<Vec<u8>> {
634        let mut buffer = vec![0_u8; 4096];
635        let read = self.stdout.read(&mut buffer).await?;
636        buffer.truncate(read);
637        Ok(buffer)
638    }
639
640    pub async fn write_frame(&mut self, payload: &[u8]) -> io::Result<()> {
641        let encoded = encode_frame(payload);
642        self.write_all(&encoded).await?;
643        self.flush().await
644    }
645
646    pub async fn read_frame(&mut self) -> io::Result<Vec<u8>> {
647        let mut content_length = None;
648        loop {
649            let mut line = String::new();
650            let bytes_read = self.stdout.read_line(&mut line).await?;
651            if bytes_read == 0 {
652                return Err(io::Error::new(
653                    io::ErrorKind::UnexpectedEof,
654                    "MCP stdio stream closed while reading headers",
655                ));
656            }
657            if line == "\r\n" {
658                break;
659            }
660            if let Some(value) = line.strip_prefix("Content-Length:") {
661                let parsed = value
662                    .trim()
663                    .parse::<usize>()
664                    .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
665                content_length = Some(parsed);
666            }
667        }
668
669        let content_length = content_length.ok_or_else(|| {
670            io::Error::new(io::ErrorKind::InvalidData, "missing Content-Length header")
671        })?;
672        let mut payload = vec![0_u8; content_length];
673        self.stdout.read_exact(&mut payload).await?;
674        Ok(payload)
675    }
676
677    pub async fn write_jsonrpc_message<T: Serialize>(&mut self, message: &T) -> io::Result<()> {
678        let body = serde_json::to_vec(message)
679            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
680        self.write_frame(&body).await
681    }
682
683    pub async fn read_jsonrpc_message<T: DeserializeOwned>(&mut self) -> io::Result<T> {
684        let payload = self.read_frame().await?;
685        serde_json::from_slice(&payload)
686            .map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))
687    }
688
689    pub async fn send_request<T: Serialize>(
690        &mut self,
691        request: &JsonRpcRequest<T>,
692    ) -> io::Result<()> {
693        self.write_jsonrpc_message(request).await
694    }
695
696    pub async fn read_response<T: DeserializeOwned>(&mut self) -> io::Result<JsonRpcResponse<T>> {
697        self.read_jsonrpc_message().await
698    }
699
700    pub async fn request<TParams: Serialize, TResult: DeserializeOwned>(
701        &mut self,
702        id: JsonRpcId,
703        method: impl Into<String>,
704        params: Option<TParams>,
705    ) -> io::Result<JsonRpcResponse<TResult>> {
706        let request = JsonRpcRequest::new(id, method, params);
707        self.send_request(&request).await?;
708        self.read_response().await
709    }
710
711    pub async fn initialize(
712        &mut self,
713        id: JsonRpcId,
714        params: McpInitializeParams,
715    ) -> io::Result<JsonRpcResponse<McpInitializeResult>> {
716        self.request(id, "initialize", Some(params)).await
717    }
718
719    pub async fn list_tools(
720        &mut self,
721        id: JsonRpcId,
722        params: Option<McpListToolsParams>,
723    ) -> io::Result<JsonRpcResponse<McpListToolsResult>> {
724        self.request(id, "tools/list", params).await
725    }
726
727    pub async fn call_tool(
728        &mut self,
729        id: JsonRpcId,
730        params: McpToolCallParams,
731    ) -> io::Result<JsonRpcResponse<McpToolCallResult>> {
732        self.request(id, "tools/call", Some(params)).await
733    }
734
735    pub async fn list_resources(
736        &mut self,
737        id: JsonRpcId,
738        params: Option<McpListResourcesParams>,
739    ) -> io::Result<JsonRpcResponse<McpListResourcesResult>> {
740        self.request(id, "resources/list", params).await
741    }
742
743    pub async fn read_resource(
744        &mut self,
745        id: JsonRpcId,
746        params: McpReadResourceParams,
747    ) -> io::Result<JsonRpcResponse<McpReadResourceResult>> {
748        self.request(id, "resources/read", Some(params)).await
749    }
750
751    pub async fn terminate(&mut self) -> io::Result<()> {
752        self.child.kill().await
753    }
754
755    pub async fn wait(&mut self) -> io::Result<std::process::ExitStatus> {
756        self.child.wait().await
757    }
758
759    async fn shutdown(&mut self) -> io::Result<()> {
760        if self.child.try_wait()?.is_none() {
761            self.child.kill().await?;
762        }
763        let _ = self.child.wait().await?;
764        Ok(())
765    }
766}
767
768pub fn spawn_mcp_stdio_process(bootstrap: &McpClientBootstrap) -> io::Result<McpStdioProcess> {
769    match &bootstrap.transport {
770        McpClientTransport::Stdio(transport) => McpStdioProcess::spawn(transport),
771        other => Err(io::Error::new(
772            io::ErrorKind::InvalidInput,
773            format!(
774                "MCP bootstrap transport for {} is not stdio: {other:?}",
775                bootstrap.server_name
776            ),
777        )),
778    }
779}
780
781fn apply_env(command: &mut Command, env: &BTreeMap<String, String>) {
782    for (key, value) in env {
783        command.env(key, value);
784    }
785}
786
787fn encode_frame(payload: &[u8]) -> Vec<u8> {
788    let header = format!("Content-Length: {}\r\n\r\n", payload.len());
789    let mut framed = header.into_bytes();
790    framed.extend_from_slice(payload);
791    framed
792}
793
794fn default_initialize_params() -> McpInitializeParams {
795    McpInitializeParams {
796        protocol_version: "2025-03-26".to_string(),
797        capabilities: JsonValue::Object(serde_json::Map::new()),
798        client_info: McpInitializeClientInfo {
799            name: "runtime".to_string(),
800            version: env!("CARGO_PKG_VERSION").to_string(),
801        },
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use std::collections::BTreeMap;
808    use std::fs;
809    use std::io::ErrorKind;
810    use std::os::unix::fs::PermissionsExt;
811    use std::path::{Path, PathBuf};
812    use std::process::Command;
813    use std::time::{SystemTime, UNIX_EPOCH};
814
815    use serde_json::json;
816    use tokio::runtime::Builder;
817
818    use crate::config::{
819        ConfigSource, McpRemoteServerConfig, McpSdkServerConfig, McpServerConfig,
820        McpStdioServerConfig, McpWebSocketServerConfig, ScopedMcpServerConfig,
821    };
822    use crate::mcp::mcp_tool_name;
823    use crate::mcp_client::McpClientBootstrap;
824
825    use super::{
826        spawn_mcp_stdio_process, JsonRpcId, JsonRpcRequest, JsonRpcResponse,
827        McpInitializeClientInfo, McpInitializeParams, McpInitializeResult, McpInitializeServerInfo,
828        McpListToolsResult, McpReadResourceParams, McpReadResourceResult, McpServerManager,
829        McpServerManagerError, McpStdioProcess, McpTool, McpToolCallParams,
830    };
831
832    fn temp_dir() -> PathBuf {
833        let nanos = SystemTime::now()
834            .duration_since(UNIX_EPOCH)
835            .expect("time should be after epoch")
836            .as_nanos();
837        std::env::temp_dir().join(format!("runtime-mcp-stdio-{nanos}"))
838    }
839
840    fn write_echo_script() -> PathBuf {
841        let root = temp_dir();
842        fs::create_dir_all(&root).expect("temp dir");
843        let script_path = root.join("echo-mcp.sh");
844        fs::write(
845            &script_path,
846            "#!/bin/sh\nprintf 'READY:%s\\n' \"$MCP_TEST_TOKEN\"\nIFS= read -r line\nprintf 'ECHO:%s\\n' \"$line\"\n",
847        )
848        .expect("write script");
849        let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
850        permissions.set_mode(0o755);
851        fs::set_permissions(&script_path, permissions).expect("chmod");
852        script_path
853    }
854
855    fn write_jsonrpc_script() -> PathBuf {
856        let root = temp_dir();
857        fs::create_dir_all(&root).expect("temp dir");
858        let script_path = root.join("jsonrpc-mcp.py");
859        let script = [
860            "#!/usr/bin/env python3",
861            "import json, sys",
862            "header = b''",
863            r"while not header.endswith(b'\r\n\r\n'):",
864            "    chunk = sys.stdin.buffer.read(1)",
865            "    if not chunk:",
866            "        raise SystemExit(1)",
867            "    header += chunk",
868            "length = 0",
869            r"for line in header.decode().split('\r\n'):",
870            r"    if line.lower().startswith('content-length:'):",
871            r"        length = int(line.split(':', 1)[1].strip())",
872            "payload = sys.stdin.buffer.read(length)",
873            "request = json.loads(payload.decode())",
874            r"assert request['jsonrpc'] == '2.0'",
875            r"assert request['method'] == 'initialize'",
876            r"response = json.dumps({",
877            r"    'jsonrpc': '2.0',",
878            r"    'id': request['id'],",
879            r"    'result': {",
880            r"        'protocolVersion': request['params']['protocolVersion'],",
881            r"        'capabilities': {'tools': {}},",
882            r"        'serverInfo': {'name': 'fake-mcp', 'version': '0.1.0'}",
883            r"    }",
884            r"}).encode()",
885            r"sys.stdout.buffer.write(f'Content-Length: {len(response)}\r\n\r\n'.encode() + response)",
886            "sys.stdout.buffer.flush()",
887            "",
888        ]
889        .join("\n");
890        fs::write(&script_path, script).expect("write script");
891        let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
892        permissions.set_mode(0o755);
893        fs::set_permissions(&script_path, permissions).expect("chmod");
894        script_path
895    }
896
897    #[allow(clippy::too_many_lines)]
898    fn write_mcp_server_script() -> PathBuf {
899        let root = temp_dir();
900        fs::create_dir_all(&root).expect("temp dir");
901        let script_path = root.join("fake-mcp-server.py");
902        let script = [
903            "#!/usr/bin/env python3",
904            "import json, sys",
905            "",
906            "def read_message():",
907            "    header = b''",
908            r"    while not header.endswith(b'\r\n\r\n'):",
909            "        chunk = sys.stdin.buffer.read(1)",
910            "        if not chunk:",
911            "            return None",
912            "        header += chunk",
913            "    length = 0",
914            r"    for line in header.decode().split('\r\n'):",
915            r"        if line.lower().startswith('content-length:'):",
916            r"            length = int(line.split(':', 1)[1].strip())",
917            "    payload = sys.stdin.buffer.read(length)",
918            "    return json.loads(payload.decode())",
919            "",
920            "def send_message(message):",
921            "    payload = json.dumps(message).encode()",
922            r"    sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
923            "    sys.stdout.buffer.flush()",
924            "",
925            "while True:",
926            "    request = read_message()",
927            "    if request is None:",
928            "        break",
929            "    method = request['method']",
930            "    if method == 'initialize':",
931            "        send_message({",
932            "            'jsonrpc': '2.0',",
933            "            'id': request['id'],",
934            "            'result': {",
935            "                'protocolVersion': request['params']['protocolVersion'],",
936            "                'capabilities': {'tools': {}, 'resources': {}},",
937            "                'serverInfo': {'name': 'fake-mcp', 'version': '0.2.0'}",
938            "            }",
939            "        })",
940            "    elif method == 'tools/list':",
941            "        send_message({",
942            "            'jsonrpc': '2.0',",
943            "            'id': request['id'],",
944            "            'result': {",
945            "                'tools': [",
946            "                    {",
947            "                        'name': 'echo',",
948            "                        'description': 'Echoes text',",
949            "                        'inputSchema': {",
950            "                            'type': 'object',",
951            "                            'properties': {'text': {'type': 'string'}},",
952            "                            'required': ['text']",
953            "                        }",
954            "                    }",
955            "                ]",
956            "            }",
957            "        })",
958            "    elif method == 'tools/call':",
959            "        args = request['params'].get('arguments') or {}",
960            "        if request['params']['name'] == 'fail':",
961            "            send_message({",
962            "                'jsonrpc': '2.0',",
963            "                'id': request['id'],",
964            "                'error': {'code': -32001, 'message': 'tool failed'},",
965            "            })",
966            "        else:",
967            "            text = args.get('text', '')",
968            "            send_message({",
969            "                'jsonrpc': '2.0',",
970            "                'id': request['id'],",
971            "                'result': {",
972            "                    'content': [{'type': 'text', 'text': f'echo:{text}'}],",
973            "                    'structuredContent': {'echoed': text},",
974            "                    'isError': False",
975            "                }",
976            "            })",
977            "    elif method == 'resources/list':",
978            "        send_message({",
979            "            'jsonrpc': '2.0',",
980            "            'id': request['id'],",
981            "            'result': {",
982            "                'resources': [",
983            "                    {",
984            "                        'uri': 'file://guide.txt',",
985            "                        'name': 'guide',",
986            "                        'description': 'Guide text',",
987            "                        'mimeType': 'text/plain'",
988            "                    }",
989            "                ]",
990            "            }",
991            "        })",
992            "    elif method == 'resources/read':",
993            "        uri = request['params']['uri']",
994            "        send_message({",
995            "            'jsonrpc': '2.0',",
996            "            'id': request['id'],",
997            "            'result': {",
998            "                'contents': [",
999            "                    {",
1000            "                        'uri': uri,",
1001            "                        'mimeType': 'text/plain',",
1002            "                        'text': f'contents for {uri}'",
1003            "                    }",
1004            "                ]",
1005            "            }",
1006            "        })",
1007            "    else:",
1008            "        send_message({",
1009            "            'jsonrpc': '2.0',",
1010            "            'id': request['id'],",
1011            "            'error': {'code': -32601, 'message': f'unknown method: {method}'},",
1012            "        })",
1013            "",
1014        ]
1015        .join("\n");
1016        fs::write(&script_path, script).expect("write script");
1017        let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
1018        permissions.set_mode(0o755);
1019        fs::set_permissions(&script_path, permissions).expect("chmod");
1020        script_path
1021    }
1022
1023    #[allow(clippy::too_many_lines)]
1024    fn write_manager_mcp_server_script() -> PathBuf {
1025        let root = temp_dir();
1026        fs::create_dir_all(&root).expect("temp dir");
1027        let script_path = root.join("manager-mcp-server.py");
1028        let script = [
1029            "#!/usr/bin/env python3",
1030            "import json, os, sys",
1031            "",
1032            "LABEL = os.environ.get('MCP_SERVER_LABEL', 'server')",
1033            "LOG_PATH = os.environ.get('MCP_LOG_PATH')",
1034            "initialize_count = 0",
1035            "",
1036            "def log(method):",
1037            "    if LOG_PATH:",
1038            "        with open(LOG_PATH, 'a', encoding='utf-8') as handle:",
1039            "            handle.write(f'{method}\\n')",
1040            "",
1041            "def read_message():",
1042            "    header = b''",
1043            r"    while not header.endswith(b'\r\n\r\n'):",
1044            "        chunk = sys.stdin.buffer.read(1)",
1045            "        if not chunk:",
1046            "            return None",
1047            "        header += chunk",
1048            "    length = 0",
1049            r"    for line in header.decode().split('\r\n'):",
1050            r"        if line.lower().startswith('content-length:'):",
1051            r"            length = int(line.split(':', 1)[1].strip())",
1052            "    payload = sys.stdin.buffer.read(length)",
1053            "    return json.loads(payload.decode())",
1054            "",
1055            "def send_message(message):",
1056            "    payload = json.dumps(message).encode()",
1057            r"    sys.stdout.buffer.write(f'Content-Length: {len(payload)}\r\n\r\n'.encode() + payload)",
1058            "    sys.stdout.buffer.flush()",
1059            "",
1060            "while True:",
1061            "    request = read_message()",
1062            "    if request is None:",
1063            "        break",
1064            "    method = request['method']",
1065            "    log(method)",
1066            "    if method == 'initialize':",
1067            "        initialize_count += 1",
1068            "        send_message({",
1069            "            'jsonrpc': '2.0',",
1070            "            'id': request['id'],",
1071            "            'result': {",
1072            "                'protocolVersion': request['params']['protocolVersion'],",
1073            "                'capabilities': {'tools': {}},",
1074            "                'serverInfo': {'name': LABEL, 'version': '1.0.0'}",
1075            "            }",
1076            "        })",
1077            "    elif method == 'tools/list':",
1078            "        send_message({",
1079            "            'jsonrpc': '2.0',",
1080            "            'id': request['id'],",
1081            "            'result': {",
1082            "                'tools': [",
1083            "                    {",
1084            "                        'name': 'echo',",
1085            "                        'description': f'Echo tool for {LABEL}',",
1086            "                        'inputSchema': {",
1087            "                            'type': 'object',",
1088            "                            'properties': {'text': {'type': 'string'}},",
1089            "                            'required': ['text']",
1090            "                        }",
1091            "                    }",
1092            "                ]",
1093            "            }",
1094            "        })",
1095            "    elif method == 'tools/call':",
1096            "        args = request['params'].get('arguments') or {}",
1097            "        text = args.get('text', '')",
1098            "        send_message({",
1099            "            'jsonrpc': '2.0',",
1100            "            'id': request['id'],",
1101            "            'result': {",
1102            "                'content': [{'type': 'text', 'text': f'{LABEL}:{text}'}],",
1103            "                'structuredContent': {",
1104            "                    'server': LABEL,",
1105            "                    'echoed': text,",
1106            "                    'initializeCount': initialize_count",
1107            "                },",
1108            "                'isError': False",
1109            "            }",
1110            "        })",
1111            "    else:",
1112            "        send_message({",
1113            "            'jsonrpc': '2.0',",
1114            "            'id': request['id'],",
1115            "            'error': {'code': -32601, 'message': f'unknown method: {method}'},",
1116            "        })",
1117            "",
1118        ]
1119        .join("\n");
1120        fs::write(&script_path, script).expect("write script");
1121        let mut permissions = fs::metadata(&script_path).expect("metadata").permissions();
1122        permissions.set_mode(0o755);
1123        fs::set_permissions(&script_path, permissions).expect("chmod");
1124        script_path
1125    }
1126
1127    fn sample_bootstrap(script_path: &Path) -> McpClientBootstrap {
1128        let config = ScopedMcpServerConfig {
1129            scope: ConfigSource::Local,
1130            config: McpServerConfig::Stdio(McpStdioServerConfig {
1131                command: "/bin/sh".to_string(),
1132                args: vec![script_path.to_string_lossy().into_owned()],
1133                env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "secret-value".to_string())]),
1134            }),
1135        };
1136        McpClientBootstrap::from_scoped_config("stdio server", &config)
1137    }
1138
1139    fn script_transport(script_path: &Path) -> crate::mcp_client::McpStdioTransport {
1140        crate::mcp_client::McpStdioTransport {
1141            command: python_command(),
1142            args: vec![script_path.to_string_lossy().into_owned()],
1143            env: BTreeMap::new(),
1144        }
1145    }
1146
1147    fn python_command() -> String {
1148        for key in ["MCP_TEST_PYTHON", "PYTHON3", "PYTHON"] {
1149            if let Ok(value) = std::env::var(key) {
1150                if !value.trim().is_empty() {
1151                    return value;
1152                }
1153            }
1154        }
1155
1156        for candidate in ["python3", "python"] {
1157            if Command::new(candidate).arg("--version").output().is_ok() {
1158                return candidate.to_string();
1159            }
1160        }
1161
1162        panic!("expected a Python interpreter for MCP stdio tests")
1163    }
1164
1165    fn cleanup_script(script_path: &Path) {
1166        if let Err(error) = fs::remove_file(script_path) {
1167            assert_eq!(error.kind(), std::io::ErrorKind::NotFound, "cleanup script");
1168        }
1169        if let Err(error) = fs::remove_dir_all(script_path.parent().expect("script parent")) {
1170            assert_eq!(error.kind(), std::io::ErrorKind::NotFound, "cleanup dir");
1171        }
1172    }
1173
1174    fn manager_server_config(
1175        script_path: &Path,
1176        label: &str,
1177        log_path: &Path,
1178    ) -> ScopedMcpServerConfig {
1179        ScopedMcpServerConfig {
1180            scope: ConfigSource::Local,
1181            config: McpServerConfig::Stdio(McpStdioServerConfig {
1182                command: python_command(),
1183                args: vec![script_path.to_string_lossy().into_owned()],
1184                env: BTreeMap::from([
1185                    ("MCP_SERVER_LABEL".to_string(), label.to_string()),
1186                    (
1187                        "MCP_LOG_PATH".to_string(),
1188                        log_path.to_string_lossy().into_owned(),
1189                    ),
1190                ]),
1191            }),
1192        }
1193    }
1194
1195    #[test]
1196    fn spawns_stdio_process_and_round_trips_io() {
1197        let runtime = Builder::new_current_thread()
1198            .enable_all()
1199            .build()
1200            .expect("runtime");
1201        runtime.block_on(async {
1202            let script_path = write_echo_script();
1203            let bootstrap = sample_bootstrap(&script_path);
1204            let mut process = spawn_mcp_stdio_process(&bootstrap).expect("spawn stdio process");
1205
1206            let ready = process.read_line().await.expect("read ready");
1207            assert_eq!(ready, "READY:secret-value\n");
1208
1209            process
1210                .write_line("ping from client")
1211                .await
1212                .expect("write line");
1213
1214            let echoed = process.read_line().await.expect("read echo");
1215            assert_eq!(echoed, "ECHO:ping from client\n");
1216
1217            let status = process.wait().await.expect("wait for exit");
1218            assert!(status.success());
1219
1220            cleanup_script(&script_path);
1221        });
1222    }
1223
1224    #[test]
1225    fn rejects_non_stdio_bootstrap() {
1226        let config = ScopedMcpServerConfig {
1227            scope: ConfigSource::Local,
1228            config: McpServerConfig::Sdk(crate::config::McpSdkServerConfig {
1229                name: "sdk-server".to_string(),
1230            }),
1231        };
1232        let bootstrap = McpClientBootstrap::from_scoped_config("sdk server", &config);
1233        let error = spawn_mcp_stdio_process(&bootstrap).expect_err("non-stdio should fail");
1234        assert_eq!(error.kind(), ErrorKind::InvalidInput);
1235    }
1236
1237    #[test]
1238    fn round_trips_initialize_request_and_response_over_stdio_frames() {
1239        let runtime = Builder::new_current_thread()
1240            .enable_all()
1241            .build()
1242            .expect("runtime");
1243        runtime.block_on(async {
1244            let script_path = write_jsonrpc_script();
1245            let transport = script_transport(&script_path);
1246            let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1247
1248            let response = process
1249                .initialize(
1250                    JsonRpcId::Number(1),
1251                    McpInitializeParams {
1252                        protocol_version: "2025-03-26".to_string(),
1253                        capabilities: json!({"roots": {}}),
1254                        client_info: McpInitializeClientInfo {
1255                            name: "runtime-tests".to_string(),
1256                            version: "0.1.0".to_string(),
1257                        },
1258                    },
1259                )
1260                .await
1261                .expect("initialize roundtrip");
1262
1263            assert_eq!(response.id, JsonRpcId::Number(1));
1264            assert_eq!(response.error, None);
1265            assert_eq!(
1266                response.result,
1267                Some(McpInitializeResult {
1268                    protocol_version: "2025-03-26".to_string(),
1269                    capabilities: json!({"tools": {}}),
1270                    server_info: McpInitializeServerInfo {
1271                        name: "fake-mcp".to_string(),
1272                        version: "0.1.0".to_string(),
1273                    },
1274                })
1275            );
1276
1277            let status = process.wait().await.expect("wait for exit");
1278            assert!(status.success());
1279
1280            cleanup_script(&script_path);
1281        });
1282    }
1283
1284    #[test]
1285    fn write_jsonrpc_request_emits_content_length_frame() {
1286        let runtime = Builder::new_current_thread()
1287            .enable_all()
1288            .build()
1289            .expect("runtime");
1290        runtime.block_on(async {
1291            let script_path = write_jsonrpc_script();
1292            let transport = script_transport(&script_path);
1293            let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1294            let request = JsonRpcRequest::new(
1295                JsonRpcId::Number(7),
1296                "initialize",
1297                Some(json!({
1298                    "protocolVersion": "2025-03-26",
1299                    "capabilities": {},
1300                    "clientInfo": {"name": "runtime-tests", "version": "0.1.0"}
1301                })),
1302            );
1303
1304            process.send_request(&request).await.expect("send request");
1305            let response: JsonRpcResponse<serde_json::Value> =
1306                process.read_response().await.expect("read response");
1307
1308            assert_eq!(response.id, JsonRpcId::Number(7));
1309            assert_eq!(response.jsonrpc, "2.0");
1310
1311            let status = process.wait().await.expect("wait for exit");
1312            assert!(status.success());
1313
1314            cleanup_script(&script_path);
1315        });
1316    }
1317
1318    #[test]
1319    fn direct_spawn_uses_transport_env() {
1320        let runtime = Builder::new_current_thread()
1321            .enable_all()
1322            .build()
1323            .expect("runtime");
1324        runtime.block_on(async {
1325            let script_path = write_echo_script();
1326            let transport = crate::mcp_client::McpStdioTransport {
1327                command: "/bin/sh".to_string(),
1328                args: vec![script_path.to_string_lossy().into_owned()],
1329                env: BTreeMap::from([("MCP_TEST_TOKEN".to_string(), "direct-secret".to_string())]),
1330            };
1331            let mut process = McpStdioProcess::spawn(&transport).expect("spawn transport directly");
1332            let ready = process.read_available().await.expect("read ready");
1333            assert_eq!(String::from_utf8_lossy(&ready), "READY:direct-secret\n");
1334            process.terminate().await.expect("terminate child");
1335            let _ = process.wait().await.expect("wait after kill");
1336
1337            cleanup_script(&script_path);
1338        });
1339    }
1340
1341    #[test]
1342    fn lists_tools_calls_tool_and_reads_resources_over_jsonrpc() {
1343        let runtime = Builder::new_current_thread()
1344            .enable_all()
1345            .build()
1346            .expect("runtime");
1347        runtime.block_on(async {
1348            let script_path = write_mcp_server_script();
1349            let transport = script_transport(&script_path);
1350            let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
1351
1352            let tools = process
1353                .list_tools(JsonRpcId::Number(2), None)
1354                .await
1355                .expect("list tools");
1356            assert_eq!(tools.error, None);
1357            assert_eq!(tools.id, JsonRpcId::Number(2));
1358            assert_eq!(
1359                tools.result,
1360                Some(McpListToolsResult {
1361                    tools: vec![McpTool {
1362                        name: "echo".to_string(),
1363                        description: Some("Echoes text".to_string()),
1364                        input_schema: Some(json!({
1365                            "type": "object",
1366                            "properties": {"text": {"type": "string"}},
1367                            "required": ["text"]
1368                        })),
1369                        annotations: None,
1370                        meta: None,
1371                    }],
1372                    next_cursor: None,
1373                })
1374            );
1375
1376            let call = process
1377                .call_tool(
1378                    JsonRpcId::String("call-1".to_string()),
1379                    McpToolCallParams {
1380                        name: "echo".to_string(),
1381                        arguments: Some(json!({"text": "hello"})),
1382                        meta: None,
1383                    },
1384                )
1385                .await
1386                .expect("call tool");
1387            assert_eq!(call.error, None);
1388            let call_result = call.result.expect("tool result");
1389            assert_eq!(call_result.is_error, Some(false));
1390            assert_eq!(
1391                call_result.structured_content,
1392                Some(json!({"echoed": "hello"}))
1393            );
1394            assert_eq!(call_result.content.len(), 1);
1395            assert_eq!(call_result.content[0].kind, "text");
1396            assert_eq!(
1397                call_result.content[0].data.get("text"),
1398                Some(&json!("echo:hello"))
1399            );
1400
1401            let resources = process
1402                .list_resources(JsonRpcId::Number(3), None)
1403                .await
1404                .expect("list resources");
1405            let resources_result = resources.result.expect("resources result");
1406            assert_eq!(resources_result.resources.len(), 1);
1407            assert_eq!(resources_result.resources[0].uri, "file://guide.txt");
1408            assert_eq!(
1409                resources_result.resources[0].mime_type.as_deref(),
1410                Some("text/plain")
1411            );
1412
1413            let read = process
1414                .read_resource(
1415                    JsonRpcId::Number(4),
1416                    McpReadResourceParams {
1417                        uri: "file://guide.txt".to_string(),
1418                    },
1419                )
1420                .await
1421                .expect("read resource");
1422            assert_eq!(
1423                read.result,
1424                Some(McpReadResourceResult {
1425                    contents: vec![super::McpResourceContents {
1426                        uri: "file://guide.txt".to_string(),
1427                        mime_type: Some("text/plain".to_string()),
1428                        text: Some("contents for file://guide.txt".to_string()),
1429                        blob: None,
1430                        meta: None,
1431                    }],
1432                })
1433            );
1434
1435            process.terminate().await.expect("terminate child");
1436            let _ = process.wait().await.expect("wait after kill");
1437            cleanup_script(&script_path);
1438        });
1439    }
1440
1441    #[test]
1442    fn surfaces_jsonrpc_errors_from_tool_calls() {
1443        let runtime = Builder::new_current_thread()
1444            .enable_all()
1445            .build()
1446            .expect("runtime");
1447        runtime.block_on(async {
1448            let script_path = write_mcp_server_script();
1449            let transport = script_transport(&script_path);
1450            let mut process = McpStdioProcess::spawn(&transport).expect("spawn fake mcp server");
1451
1452            let response = process
1453                .call_tool(
1454                    JsonRpcId::Number(9),
1455                    McpToolCallParams {
1456                        name: "fail".to_string(),
1457                        arguments: None,
1458                        meta: None,
1459                    },
1460                )
1461                .await
1462                .expect("call tool with error response");
1463
1464            assert_eq!(response.id, JsonRpcId::Number(9));
1465            assert!(response.result.is_none());
1466            assert_eq!(response.error.as_ref().map(|e| e.code), Some(-32001));
1467            assert_eq!(
1468                response.error.as_ref().map(|e| e.message.as_str()),
1469                Some("tool failed")
1470            );
1471
1472            process.terminate().await.expect("terminate child");
1473            let _ = process.wait().await.expect("wait after kill");
1474            cleanup_script(&script_path);
1475        });
1476    }
1477
1478    #[test]
1479    fn manager_discovers_tools_from_stdio_config() {
1480        let runtime = Builder::new_current_thread()
1481            .enable_all()
1482            .build()
1483            .expect("runtime");
1484        runtime.block_on(async {
1485            let script_path = write_manager_mcp_server_script();
1486            let root = script_path.parent().expect("script parent");
1487            let log_path = root.join("alpha.log");
1488            let servers = BTreeMap::from([(
1489                "alpha".to_string(),
1490                manager_server_config(&script_path, "alpha", &log_path),
1491            )]);
1492            let mut manager = McpServerManager::from_servers(&servers);
1493
1494            let tools = manager.discover_tools().await.expect("discover tools");
1495
1496            assert_eq!(tools.len(), 1);
1497            assert_eq!(tools[0].server_name, "alpha");
1498            assert_eq!(tools[0].raw_name, "echo");
1499            assert_eq!(tools[0].qualified_name, mcp_tool_name("alpha", "echo"));
1500            assert_eq!(tools[0].tool.name, "echo");
1501            assert!(manager.unsupported_servers().is_empty());
1502
1503            manager.shutdown().await.expect("shutdown");
1504            cleanup_script(&script_path);
1505        });
1506    }
1507
1508    #[test]
1509    fn manager_routes_tool_calls_to_correct_server() {
1510        let runtime = Builder::new_current_thread()
1511            .enable_all()
1512            .build()
1513            .expect("runtime");
1514        runtime.block_on(async {
1515            let script_path = write_manager_mcp_server_script();
1516            let root = script_path.parent().expect("script parent");
1517            let alpha_log = root.join("alpha.log");
1518            let beta_log = root.join("beta.log");
1519            let servers = BTreeMap::from([
1520                (
1521                    "alpha".to_string(),
1522                    manager_server_config(&script_path, "alpha", &alpha_log),
1523                ),
1524                (
1525                    "beta".to_string(),
1526                    manager_server_config(&script_path, "beta", &beta_log),
1527                ),
1528            ]);
1529            let mut manager = McpServerManager::from_servers(&servers);
1530
1531            let tools = manager.discover_tools().await.expect("discover tools");
1532            assert_eq!(tools.len(), 2);
1533
1534            let alpha = manager
1535                .call_tool(
1536                    &mcp_tool_name("alpha", "echo"),
1537                    Some(json!({"text": "hello"})),
1538                )
1539                .await
1540                .expect("call alpha tool");
1541            let beta = manager
1542                .call_tool(
1543                    &mcp_tool_name("beta", "echo"),
1544                    Some(json!({"text": "world"})),
1545                )
1546                .await
1547                .expect("call beta tool");
1548
1549            assert_eq!(
1550                alpha
1551                    .result
1552                    .as_ref()
1553                    .and_then(|result| result.structured_content.as_ref())
1554                    .and_then(|value| value.get("server")),
1555                Some(&json!("alpha"))
1556            );
1557            assert_eq!(
1558                beta.result
1559                    .as_ref()
1560                    .and_then(|result| result.structured_content.as_ref())
1561                    .and_then(|value| value.get("server")),
1562                Some(&json!("beta"))
1563            );
1564
1565            manager.shutdown().await.expect("shutdown");
1566            cleanup_script(&script_path);
1567        });
1568    }
1569
1570    #[test]
1571    fn manager_records_unsupported_non_stdio_servers_without_panicking() {
1572        let servers = BTreeMap::from([
1573            (
1574                "http".to_string(),
1575                ScopedMcpServerConfig {
1576                    scope: ConfigSource::Local,
1577                    config: McpServerConfig::Http(McpRemoteServerConfig {
1578                        url: "https://example.test/mcp".to_string(),
1579                        headers: BTreeMap::new(),
1580                        headers_helper: None,
1581                        oauth: None,
1582                    }),
1583                },
1584            ),
1585            (
1586                "sdk".to_string(),
1587                ScopedMcpServerConfig {
1588                    scope: ConfigSource::Local,
1589                    config: McpServerConfig::Sdk(McpSdkServerConfig {
1590                        name: "sdk-server".to_string(),
1591                    }),
1592                },
1593            ),
1594            (
1595                "ws".to_string(),
1596                ScopedMcpServerConfig {
1597                    scope: ConfigSource::Local,
1598                    config: McpServerConfig::Ws(McpWebSocketServerConfig {
1599                        url: "wss://example.test/mcp".to_string(),
1600                        headers: BTreeMap::new(),
1601                        headers_helper: None,
1602                    }),
1603                },
1604            ),
1605        ]);
1606
1607        let manager = McpServerManager::from_servers(&servers);
1608        let unsupported = manager.unsupported_servers();
1609
1610        assert_eq!(unsupported.len(), 3);
1611        assert_eq!(unsupported[0].server_name, "http");
1612        assert_eq!(unsupported[1].server_name, "sdk");
1613        assert_eq!(unsupported[2].server_name, "ws");
1614    }
1615
1616    #[test]
1617    fn manager_shutdown_terminates_spawned_children_and_is_idempotent() {
1618        let runtime = Builder::new_current_thread()
1619            .enable_all()
1620            .build()
1621            .expect("runtime");
1622        runtime.block_on(async {
1623            let script_path = write_manager_mcp_server_script();
1624            let root = script_path.parent().expect("script parent");
1625            let log_path = root.join("alpha.log");
1626            let servers = BTreeMap::from([(
1627                "alpha".to_string(),
1628                manager_server_config(&script_path, "alpha", &log_path),
1629            )]);
1630            let mut manager = McpServerManager::from_servers(&servers);
1631
1632            manager.discover_tools().await.expect("discover tools");
1633            manager.shutdown().await.expect("first shutdown");
1634            manager.shutdown().await.expect("second shutdown");
1635
1636            cleanup_script(&script_path);
1637        });
1638    }
1639
1640    #[test]
1641    fn manager_reuses_spawned_server_between_discovery_and_call() {
1642        let runtime = Builder::new_current_thread()
1643            .enable_all()
1644            .build()
1645            .expect("runtime");
1646        runtime.block_on(async {
1647            let script_path = write_manager_mcp_server_script();
1648            let root = script_path.parent().expect("script parent");
1649            let log_path = root.join("alpha.log");
1650            let servers = BTreeMap::from([(
1651                "alpha".to_string(),
1652                manager_server_config(&script_path, "alpha", &log_path),
1653            )]);
1654            let mut manager = McpServerManager::from_servers(&servers);
1655
1656            manager.discover_tools().await.expect("discover tools");
1657            let response = manager
1658                .call_tool(
1659                    &mcp_tool_name("alpha", "echo"),
1660                    Some(json!({"text": "reuse"})),
1661                )
1662                .await
1663                .expect("call tool");
1664
1665            assert_eq!(
1666                response
1667                    .result
1668                    .as_ref()
1669                    .and_then(|result| result.structured_content.as_ref())
1670                    .and_then(|value| value.get("initializeCount")),
1671                Some(&json!(1))
1672            );
1673
1674            let log = fs::read_to_string(&log_path).expect("read log");
1675            assert_eq!(log.lines().filter(|line| *line == "initialize").count(), 1);
1676            assert_eq!(
1677                log.lines().collect::<Vec<_>>(),
1678                vec!["initialize", "tools/list", "tools/call"]
1679            );
1680
1681            manager.shutdown().await.expect("shutdown");
1682            cleanup_script(&script_path);
1683        });
1684    }
1685
1686    #[test]
1687    fn manager_reports_unknown_qualified_tool_name() {
1688        let runtime = Builder::new_current_thread()
1689            .enable_all()
1690            .build()
1691            .expect("runtime");
1692        runtime.block_on(async {
1693            let script_path = write_manager_mcp_server_script();
1694            let root = script_path.parent().expect("script parent");
1695            let log_path = root.join("alpha.log");
1696            let servers = BTreeMap::from([(
1697                "alpha".to_string(),
1698                manager_server_config(&script_path, "alpha", &log_path),
1699            )]);
1700            let mut manager = McpServerManager::from_servers(&servers);
1701
1702            let error = manager
1703                .call_tool(
1704                    &mcp_tool_name("alpha", "missing"),
1705                    Some(json!({"text": "nope"})),
1706                )
1707                .await
1708                .expect_err("unknown qualified tool should fail");
1709
1710            match error {
1711                McpServerManagerError::UnknownTool { qualified_name } => {
1712                    assert_eq!(qualified_name, mcp_tool_name("alpha", "missing"));
1713                }
1714                other => panic!("expected unknown tool error, got {other:?}"),
1715            }
1716
1717            cleanup_script(&script_path);
1718        });
1719    }
1720}