Skip to main content

harn_vm/
mcp.rs

1//! MCP (Model Context Protocol) client for connecting to external tool servers.
2//!
3//! Supports stdio transport and streamable HTTP-style request/response transport.
4
5use std::collections::BTreeMap;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use serde::Deserialize;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::process::{Child, ChildStdin, ChildStdout};
12use tokio::sync::Mutex;
13
14use crate::stdlib::json_to_vm_value;
15use crate::value::{VmError, VmValue};
16use crate::vm::Vm;
17
18/// MCP protocol version we negotiate by default.
19const PROTOCOL_VERSION: &str = "2025-11-25";
20
21/// Default timeout for MCP requests (60 seconds).
22const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
23
24#[derive(Clone, Debug, Deserialize)]
25#[serde(rename_all = "lowercase")]
26enum McpTransport {
27    Stdio,
28    Http,
29}
30
31#[derive(Clone, Debug, Deserialize)]
32pub struct McpServerSpec {
33    pub name: String,
34    #[serde(default = "default_transport")]
35    transport: McpTransport,
36    #[serde(default)]
37    pub command: String,
38    #[serde(default)]
39    pub args: Vec<String>,
40    #[serde(default)]
41    pub env: BTreeMap<String, String>,
42    #[serde(default)]
43    pub url: String,
44    #[serde(default)]
45    pub auth_token: Option<String>,
46    #[serde(default)]
47    pub protocol_version: Option<String>,
48    #[serde(default)]
49    pub proxy_server_name: Option<String>,
50}
51
52fn default_transport() -> McpTransport {
53    McpTransport::Stdio
54}
55
56/// Internal state for an MCP client connection.
57enum McpClientInner {
58    Stdio(StdioMcpClientInner),
59    Http(HttpMcpClientInner),
60}
61
62struct StdioMcpClientInner {
63    child: Child,
64    stdin: ChildStdin,
65    reader: BufReader<ChildStdout>,
66    next_id: u64,
67}
68
69struct HttpMcpClientInner {
70    client: reqwest::Client,
71    url: String,
72    auth_token: Option<String>,
73    protocol_version: String,
74    session_id: Option<String>,
75    next_id: u64,
76    proxy_server_name: Option<String>,
77}
78
79/// Handle to an MCP client connection, stored in VmValue.
80#[derive(Clone)]
81pub struct VmMcpClientHandle {
82    pub name: String,
83    inner: Arc<Mutex<Option<McpClientInner>>>,
84}
85
86impl std::fmt::Debug for VmMcpClientHandle {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        write!(f, "McpClient({})", self.name)
89    }
90}
91
92impl VmMcpClientHandle {
93    async fn call(
94        &self,
95        method: &str,
96        params: serde_json::Value,
97    ) -> Result<serde_json::Value, VmError> {
98        let mut guard = self.inner.lock().await;
99        let inner = guard
100            .as_mut()
101            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
102
103        match inner {
104            McpClientInner::Stdio(inner) => stdio_call(inner, method, params).await,
105            McpClientInner::Http(inner) => http_call(inner, method, params).await,
106        }
107    }
108
109    async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
110        let mut guard = self.inner.lock().await;
111        let inner = guard
112            .as_mut()
113            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
114
115        match inner {
116            McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
117            McpClientInner::Http(inner) => http_notify(inner, method, params).await,
118        }
119    }
120}
121
122async fn stdio_call(
123    inner: &mut StdioMcpClientInner,
124    method: &str,
125    params: serde_json::Value,
126) -> Result<serde_json::Value, VmError> {
127    let id = inner.next_id;
128    inner.next_id += 1;
129
130    let request = serde_json::json!({
131        "jsonrpc": "2.0",
132        "id": id,
133        "method": method,
134        "params": params,
135    });
136
137    let line = serde_json::to_string(&request)
138        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
139    inner
140        .stdin
141        .write_all(line.as_bytes())
142        .await
143        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
144    inner
145        .stdin
146        .write_all(b"\n")
147        .await
148        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
149    inner
150        .stdin
151        .flush()
152        .await
153        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
154
155    let mut line_buf = String::new();
156    loop {
157        line_buf.clear();
158        let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
159            .await
160            .map_err(|_| {
161                VmError::Runtime(format!(
162                    "MCP: server did not respond to '{method}' within {}s",
163                    MCP_TIMEOUT.as_secs()
164                ))
165            })?
166            .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
167
168        if bytes_read == 0 {
169            return Err(VmError::Runtime("MCP: server closed connection".into()));
170        }
171
172        let trimmed = line_buf.trim();
173        if trimmed.is_empty() {
174            continue;
175        }
176
177        let msg: serde_json::Value = match serde_json::from_str(trimmed) {
178            Ok(v) => v,
179            Err(_) => continue,
180        };
181
182        if msg.get("id").is_none() {
183            continue;
184        }
185
186        if msg["id"].as_u64() == Some(id) {
187            return parse_jsonrpc_result(msg);
188        }
189    }
190}
191
192async fn stdio_notify(
193    inner: &mut StdioMcpClientInner,
194    method: &str,
195    params: serde_json::Value,
196) -> Result<(), VmError> {
197    let notification = serde_json::json!({
198        "jsonrpc": "2.0",
199        "method": method,
200        "params": params,
201    });
202
203    let line = serde_json::to_string(&notification)
204        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
205    inner
206        .stdin
207        .write_all(line.as_bytes())
208        .await
209        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
210    inner
211        .stdin
212        .write_all(b"\n")
213        .await
214        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
215    inner
216        .stdin
217        .flush()
218        .await
219        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
220    Ok(())
221}
222
223async fn http_call(
224    inner: &mut HttpMcpClientInner,
225    method: &str,
226    params: serde_json::Value,
227) -> Result<serde_json::Value, VmError> {
228    let id = inner.next_id;
229    inner.next_id += 1;
230    send_http_request(inner, method, params, Some(id)).await
231}
232
233async fn http_notify(
234    inner: &mut HttpMcpClientInner,
235    method: &str,
236    params: serde_json::Value,
237) -> Result<(), VmError> {
238    let _ = send_http_request(inner, method, params, None).await?;
239    Ok(())
240}
241
242async fn send_http_request(
243    inner: &mut HttpMcpClientInner,
244    method: &str,
245    params: serde_json::Value,
246    id: Option<u64>,
247) -> Result<serde_json::Value, VmError> {
248    for attempt in 0..2 {
249        let response = send_http_request_once(inner, method, params.clone(), id).await?;
250
251        let status = response.status().as_u16();
252        let headers = response.headers().clone();
253        if let Some(protocol_version) = headers
254            .get("MCP-Protocol-Version")
255            .and_then(|v| v.to_str().ok())
256        {
257            inner.protocol_version = protocol_version.to_string();
258        }
259        if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
260            inner.session_id = Some(session_id.to_string());
261        }
262
263        if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
264            inner.session_id = None;
265            reinitialize_http_client(inner).await?;
266            continue;
267        }
268
269        if status == 401 {
270            return Err(VmError::Thrown(VmValue::String(Rc::from(
271                "MCP authorization required",
272            ))));
273        }
274
275        let body = response
276            .text()
277            .await
278            .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
279
280        if body.trim().is_empty() {
281            return Ok(serde_json::Value::Null);
282        }
283
284        let msg = parse_http_response_body(&body, status)?;
285
286        if status >= 400 {
287            return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
288        }
289
290        if id.is_none() {
291            return Ok(msg);
292        }
293        return parse_jsonrpc_result(msg);
294    }
295
296    Err(VmError::Runtime("MCP HTTP request failed".into()))
297}
298
299async fn send_http_request_once(
300    inner: &mut HttpMcpClientInner,
301    method: &str,
302    params: serde_json::Value,
303    id: Option<u64>,
304) -> Result<reqwest::Response, VmError> {
305    let payload = if let Some(proxy_server_name) = &inner.proxy_server_name {
306        let mut body = serde_json::json!({
307            "serverName": proxy_server_name,
308            "jsonrpc": "2.0",
309            "method": method,
310            "params": params,
311        });
312        if let Some(id) = id {
313            body["id"] = serde_json::json!(id);
314        }
315        body
316    } else {
317        let mut body = serde_json::json!({
318            "jsonrpc": "2.0",
319            "method": method,
320            "params": params,
321        });
322        if let Some(id) = id {
323            body["id"] = serde_json::json!(id);
324        }
325        body
326    };
327
328    let mut request = inner
329        .client
330        .post(&inner.url)
331        .header("Content-Type", "application/json")
332        .header("Accept", "application/json, text/event-stream")
333        .header("MCP-Protocol-Version", &inner.protocol_version)
334        .json(&payload);
335
336    if let Some(token) = &inner.auth_token {
337        request = request.header("Authorization", format!("Bearer {token}"));
338    }
339    if let Some(session_id) = &inner.session_id {
340        request = request.header("MCP-Session-Id", session_id);
341    }
342
343    request
344        .send()
345        .await
346        .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
347}
348
349async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
350    let initialize = send_http_request_once(
351        inner,
352        "initialize",
353        serde_json::json!({
354            "protocolVersion": PROTOCOL_VERSION,
355            "capabilities": {},
356            "clientInfo": {
357                "name": "harn",
358                "version": env!("CARGO_PKG_VERSION"),
359            }
360        }),
361        Some(0),
362    )
363    .await?;
364    if let Some(protocol_version) = initialize
365        .headers()
366        .get("MCP-Protocol-Version")
367        .and_then(|v| v.to_str().ok())
368    {
369        inner.protocol_version = protocol_version.to_string();
370    }
371    if let Some(session_id) = initialize
372        .headers()
373        .get("MCP-Session-Id")
374        .and_then(|v| v.to_str().ok())
375    {
376        inner.session_id = Some(session_id.to_string());
377    }
378    let status = initialize.status().as_u16();
379    let body = initialize
380        .text()
381        .await
382        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
383    let msg = parse_http_response_body(&body, status)?;
384    if status >= 400 {
385        return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
386    }
387    let _ = parse_jsonrpc_result(msg)?;
388    let response = send_http_request_once(
389        inner,
390        "notifications/initialized",
391        serde_json::json!({}),
392        None,
393    )
394    .await?;
395    let status = response.status().as_u16();
396    if let Some(protocol_version) = response
397        .headers()
398        .get("MCP-Protocol-Version")
399        .and_then(|v| v.to_str().ok())
400    {
401        inner.protocol_version = protocol_version.to_string();
402    }
403    if let Some(session_id) = response
404        .headers()
405        .get("MCP-Session-Id")
406        .and_then(|v| v.to_str().ok())
407    {
408        inner.session_id = Some(session_id.to_string());
409    }
410    let body = response
411        .text()
412        .await
413        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
414    if body.trim().is_empty() || status < 400 {
415        return Ok(());
416    }
417    let msg = parse_http_response_body(&body, status)?;
418    Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
419}
420
421fn parse_http_response_body(body: &str, status: u16) -> Result<serde_json::Value, VmError> {
422    if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
423        return parse_sse_jsonrpc_body(body);
424    }
425    serde_json::from_str(body).map_err(|e| {
426        VmError::Runtime(format!(
427            "MCP HTTP response parse error (status {status}): {e}"
428        ))
429    })
430}
431
432fn parse_sse_jsonrpc_body(body: &str) -> Result<serde_json::Value, VmError> {
433    let mut current_data = Vec::new();
434    let mut messages = Vec::new();
435
436    for line in body.lines() {
437        if line.is_empty() {
438            if !current_data.is_empty() {
439                messages.push(current_data.join("\n"));
440                current_data.clear();
441            }
442            continue;
443        }
444        if let Some(data) = line.strip_prefix("data:") {
445            current_data.push(data.trim_start().to_string());
446        }
447    }
448    if !current_data.is_empty() {
449        messages.push(current_data.join("\n"));
450    }
451
452    for message in messages.into_iter().rev() {
453        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
454            if value.get("result").is_some()
455                || value.get("error").is_some()
456                || value.get("method").is_some()
457            {
458                return Ok(value);
459            }
460        }
461    }
462
463    Err(VmError::Runtime(
464        "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
465    ))
466}
467
468fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
469    if let Some(error) = msg.get("error") {
470        return Err(jsonrpc_error_to_vm_error(error));
471    }
472    Ok(msg
473        .get("result")
474        .cloned()
475        .unwrap_or(serde_json::Value::Null))
476}
477
478fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
479    let message = error
480        .get("message")
481        .and_then(|v| v.as_str())
482        .unwrap_or("Unknown MCP error");
483    let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
484    VmError::Thrown(VmValue::String(Rc::from(format!(
485        "MCP error ({code}): {message}"
486    ))))
487}
488
489async fn mcp_connect_stdio_impl(
490    command: &str,
491    args: &[String],
492    env: &BTreeMap<String, String>,
493) -> Result<VmMcpClientHandle, VmError> {
494    let mut cmd = tokio::process::Command::new(command);
495    cmd.args(args)
496        .stdin(std::process::Stdio::piped())
497        .stdout(std::process::Stdio::piped())
498        .stderr(std::process::Stdio::inherit())
499        .envs(env);
500
501    let mut child = cmd.spawn().map_err(|e| {
502        VmError::Thrown(VmValue::String(Rc::from(format!(
503            "mcp_connect: failed to spawn '{command}': {e}"
504        ))))
505    })?;
506
507    let stdin = child
508        .stdin
509        .take()
510        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
511    let stdout = child
512        .stdout
513        .take()
514        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
515
516    let handle = VmMcpClientHandle {
517        name: command.to_string(),
518        inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
519            StdioMcpClientInner {
520                child,
521                stdin,
522                reader: BufReader::new(stdout),
523                next_id: 1,
524            },
525        )))),
526    };
527
528    initialize_client(&handle).await?;
529    Ok(handle)
530}
531
532async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
533    let client = reqwest::Client::builder()
534        .timeout(MCP_TIMEOUT)
535        .build()
536        .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
537
538    let handle = VmMcpClientHandle {
539        name: spec.name.clone(),
540        inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
541            client,
542            url: spec.url.clone(),
543            auth_token: spec.auth_token.clone(),
544            protocol_version: spec
545                .protocol_version
546                .clone()
547                .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
548            session_id: None,
549            next_id: 1,
550            proxy_server_name: spec.proxy_server_name.clone(),
551        })))),
552    };
553
554    initialize_client(&handle).await?;
555    Ok(handle)
556}
557
558async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
559    handle
560        .call(
561            "initialize",
562            serde_json::json!({
563                "protocolVersion": PROTOCOL_VERSION,
564                "capabilities": {},
565                "clientInfo": {
566                    "name": "harn",
567                    "version": env!("CARGO_PKG_VERSION"),
568                }
569            }),
570        )
571        .await?;
572
573    handle
574        .notify("notifications/initialized", serde_json::json!({}))
575        .await?;
576
577    Ok(())
578}
579
580pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
581    match val {
582        VmValue::String(s) => serde_json::Value::String(s.to_string()),
583        VmValue::Int(n) => serde_json::json!(*n),
584        VmValue::Float(n) => serde_json::json!(*n),
585        VmValue::Bool(b) => serde_json::Value::Bool(*b),
586        VmValue::Nil => serde_json::Value::Null,
587        VmValue::List(items) => {
588            serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
589        }
590        VmValue::Dict(map) => {
591            let obj: serde_json::Map<String, serde_json::Value> = map
592                .iter()
593                .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
594                .collect();
595            serde_json::Value::Object(obj)
596        }
597        _ => serde_json::Value::Null,
598    }
599}
600
601fn extract_content_text(result: &serde_json::Value) -> String {
602    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
603        let texts: Vec<&str> = content
604            .iter()
605            .filter_map(|item| {
606                if item.get("type").and_then(|t| t.as_str()) == Some("text") {
607                    item.get("text").and_then(|t| t.as_str())
608                } else {
609                    None
610                }
611            })
612            .collect();
613        if texts.is_empty() {
614            json_to_vm_value(result).display()
615        } else {
616            texts.join("\n")
617        }
618    } else {
619        json_to_vm_value(result).display()
620    }
621}
622
623pub async fn connect_mcp_server(
624    name: &str,
625    command: &str,
626    args: &[String],
627) -> Result<VmMcpClientHandle, VmError> {
628    let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
629    handle.name = name.to_string();
630    Ok(handle)
631}
632
633pub async fn connect_mcp_server_from_spec(
634    spec: &McpServerSpec,
635) -> Result<VmMcpClientHandle, VmError> {
636    let mut handle = match spec.transport {
637        McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
638        McpTransport::Http => mcp_connect_http_impl(spec).await?,
639    };
640    handle.name = spec.name.clone();
641    Ok(handle)
642}
643
644pub async fn connect_mcp_server_from_json(
645    value: &serde_json::Value,
646) -> Result<VmMcpClientHandle, VmError> {
647    let spec: McpServerSpec = serde_json::from_value(value.clone())
648        .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
649    connect_mcp_server_from_spec(&spec).await
650}
651
652pub fn register_mcp_builtins(vm: &mut Vm) {
653    vm.register_async_builtin("mcp_connect", |args| async move {
654        let command = args.first().map(|a| a.display()).unwrap_or_default();
655        if command.is_empty() {
656            return Err(VmError::Thrown(VmValue::String(Rc::from(
657                "mcp_connect: command is required",
658            ))));
659        }
660
661        let cmd_args: Vec<String> = match args.get(1) {
662            Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
663            _ => Vec::new(),
664        };
665
666        let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
667        Ok(VmValue::McpClient(handle))
668    });
669
670    vm.register_async_builtin("mcp_list_tools", |args| async move {
671        let client = match args.first() {
672            Some(VmValue::McpClient(c)) => c.clone(),
673            _ => {
674                return Err(VmError::Thrown(VmValue::String(Rc::from(
675                    "mcp_list_tools: argument must be an MCP client",
676                ))));
677            }
678        };
679
680        let result = client.call("tools/list", serde_json::json!({})).await?;
681        let tools = result
682            .get("tools")
683            .and_then(|t| t.as_array())
684            .cloned()
685            .unwrap_or_default();
686
687        let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
688        Ok(VmValue::List(Rc::new(vm_tools)))
689    });
690
691    vm.register_async_builtin("mcp_call", |args| async move {
692        let client = match args.first() {
693            Some(VmValue::McpClient(c)) => c.clone(),
694            _ => {
695                return Err(VmError::Thrown(VmValue::String(Rc::from(
696                    "mcp_call: first argument must be an MCP client",
697                ))));
698            }
699        };
700
701        let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
702        if tool_name.is_empty() {
703            return Err(VmError::Thrown(VmValue::String(Rc::from(
704                "mcp_call: tool name is required",
705            ))));
706        }
707
708        let arguments = match args.get(2) {
709            Some(VmValue::Dict(d)) => {
710                let obj: serde_json::Map<String, serde_json::Value> = d
711                    .iter()
712                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
713                    .collect();
714                serde_json::Value::Object(obj)
715            }
716            _ => serde_json::json!({}),
717        };
718
719        let result = client
720            .call(
721                "tools/call",
722                serde_json::json!({
723                    "name": tool_name,
724                    "arguments": arguments,
725                }),
726            )
727            .await?;
728
729        if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
730            let error_text = extract_content_text(&result);
731            return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
732        }
733
734        let content = result
735            .get("content")
736            .and_then(|c| c.as_array())
737            .cloned()
738            .unwrap_or_default();
739
740        if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
741            if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
742                return Ok(VmValue::String(Rc::from(text)));
743            }
744        }
745
746        if content.is_empty() {
747            Ok(VmValue::Nil)
748        } else {
749            Ok(VmValue::List(Rc::new(
750                content.iter().map(json_to_vm_value).collect(),
751            )))
752        }
753    });
754
755    vm.register_async_builtin("mcp_server_info", |args| async move {
756        let client = match args.first() {
757            Some(VmValue::McpClient(c)) => c.clone(),
758            _ => {
759                return Err(VmError::Thrown(VmValue::String(Rc::from(
760                    "mcp_server_info: argument must be an MCP client",
761                ))));
762            }
763        };
764
765        let guard = client.inner.lock().await;
766        if guard.is_none() {
767            return Err(VmError::Runtime("MCP client is disconnected".into()));
768        }
769        drop(guard);
770
771        let mut info = BTreeMap::new();
772        info.insert(
773            "name".to_string(),
774            VmValue::String(Rc::from(client.name.as_str())),
775        );
776        info.insert("connected".to_string(), VmValue::Bool(true));
777        Ok(VmValue::Dict(Rc::new(info)))
778    });
779
780    vm.register_async_builtin("mcp_disconnect", |args| async move {
781        let client = match args.first() {
782            Some(VmValue::McpClient(c)) => c.clone(),
783            _ => {
784                return Err(VmError::Thrown(VmValue::String(Rc::from(
785                    "mcp_disconnect: argument must be an MCP client",
786                ))));
787            }
788        };
789
790        let mut guard = client.inner.lock().await;
791        if let Some(inner) = guard.take() {
792            match inner {
793                McpClientInner::Stdio(mut inner) => {
794                    let _ = inner.child.kill().await;
795                }
796                McpClientInner::Http(_) => {}
797            }
798        }
799        Ok(VmValue::Nil)
800    });
801
802    vm.register_async_builtin("mcp_list_resources", |args| async move {
803        let client = match args.first() {
804            Some(VmValue::McpClient(c)) => c.clone(),
805            _ => {
806                return Err(VmError::Thrown(VmValue::String(Rc::from(
807                    "mcp_list_resources: argument must be an MCP client",
808                ))));
809            }
810        };
811
812        let result = client.call("resources/list", serde_json::json!({})).await?;
813        let resources = result
814            .get("resources")
815            .and_then(|r| r.as_array())
816            .cloned()
817            .unwrap_or_default();
818
819        let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
820        Ok(VmValue::List(Rc::new(vm_resources)))
821    });
822
823    vm.register_async_builtin("mcp_read_resource", |args| async move {
824        let client = match args.first() {
825            Some(VmValue::McpClient(c)) => c.clone(),
826            _ => {
827                return Err(VmError::Thrown(VmValue::String(Rc::from(
828                    "mcp_read_resource: first argument must be an MCP client",
829                ))));
830            }
831        };
832
833        let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
834        if uri.is_empty() {
835            return Err(VmError::Thrown(VmValue::String(Rc::from(
836                "mcp_read_resource: URI is required",
837            ))));
838        }
839
840        let result = client
841            .call("resources/read", serde_json::json!({ "uri": uri }))
842            .await?;
843
844        let contents = result
845            .get("contents")
846            .and_then(|c| c.as_array())
847            .cloned()
848            .unwrap_or_default();
849
850        if contents.len() == 1 {
851            if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
852                return Ok(VmValue::String(Rc::from(text)));
853            }
854        }
855
856        if contents.is_empty() {
857            Ok(VmValue::Nil)
858        } else {
859            Ok(VmValue::List(Rc::new(
860                contents.iter().map(json_to_vm_value).collect(),
861            )))
862        }
863    });
864
865    vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
866        let client = match args.first() {
867            Some(VmValue::McpClient(c)) => c.clone(),
868            _ => {
869                return Err(VmError::Thrown(VmValue::String(Rc::from(
870                    "mcp_list_resource_templates: argument must be an MCP client",
871                ))));
872            }
873        };
874
875        let result = client
876            .call("resources/templates/list", serde_json::json!({}))
877            .await?;
878
879        let templates = result
880            .get("resourceTemplates")
881            .and_then(|r| r.as_array())
882            .cloned()
883            .unwrap_or_default();
884
885        let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
886        Ok(VmValue::List(Rc::new(vm_templates)))
887    });
888
889    vm.register_async_builtin("mcp_list_prompts", |args| async move {
890        let client = match args.first() {
891            Some(VmValue::McpClient(c)) => c.clone(),
892            _ => {
893                return Err(VmError::Thrown(VmValue::String(Rc::from(
894                    "mcp_list_prompts: argument must be an MCP client",
895                ))));
896            }
897        };
898
899        let result = client.call("prompts/list", serde_json::json!({})).await?;
900
901        let prompts = result
902            .get("prompts")
903            .and_then(|p| p.as_array())
904            .cloned()
905            .unwrap_or_default();
906
907        let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
908        Ok(VmValue::List(Rc::new(vm_prompts)))
909    });
910
911    vm.register_async_builtin("mcp_get_prompt", |args| async move {
912        let client = match args.first() {
913            Some(VmValue::McpClient(c)) => c.clone(),
914            _ => {
915                return Err(VmError::Thrown(VmValue::String(Rc::from(
916                    "mcp_get_prompt: first argument must be an MCP client",
917                ))));
918            }
919        };
920
921        let name = args.get(1).map(|a| a.display()).unwrap_or_default();
922        if name.is_empty() {
923            return Err(VmError::Thrown(VmValue::String(Rc::from(
924                "mcp_get_prompt: prompt name is required",
925            ))));
926        }
927
928        let arguments = match args.get(2) {
929            Some(VmValue::Dict(d)) => {
930                let obj: serde_json::Map<String, serde_json::Value> = d
931                    .iter()
932                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
933                    .collect();
934                serde_json::Value::Object(obj)
935            }
936            _ => serde_json::json!({}),
937        };
938
939        let result = client
940            .call(
941                "prompts/get",
942                serde_json::json!({
943                    "name": name,
944                    "arguments": arguments,
945                }),
946            )
947            .await?;
948
949        Ok(json_to_vm_value(&result))
950    });
951}
952
953#[cfg(test)]
954mod tests {
955    use super::*;
956
957    #[test]
958    fn test_vm_value_to_serde_string() {
959        let val = VmValue::String(Rc::from("hello"));
960        let json = vm_value_to_serde(&val);
961        assert_eq!(json, serde_json::json!("hello"));
962    }
963
964    #[test]
965    fn test_vm_value_to_serde_dict() {
966        let mut map = BTreeMap::new();
967        map.insert("key".to_string(), VmValue::Int(42));
968        let val = VmValue::Dict(Rc::new(map));
969        let json = vm_value_to_serde(&val);
970        assert_eq!(json, serde_json::json!({"key": 42}));
971    }
972
973    #[test]
974    fn test_vm_value_to_serde_list() {
975        let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
976        let json = vm_value_to_serde(&val);
977        assert_eq!(json, serde_json::json!([1, 2]));
978    }
979
980    #[test]
981    fn test_extract_content_text_single() {
982        let result = serde_json::json!({
983            "content": [{"type": "text", "text": "hello world"}],
984            "isError": false
985        });
986        assert_eq!(extract_content_text(&result), "hello world");
987    }
988
989    #[test]
990    fn test_extract_content_text_multiple() {
991        let result = serde_json::json!({
992            "content": [
993                {"type": "text", "text": "first"},
994                {"type": "text", "text": "second"}
995            ],
996            "isError": false
997        });
998        assert_eq!(extract_content_text(&result), "first\nsecond");
999    }
1000
1001    #[test]
1002    fn test_extract_content_text_fallback_json() {
1003        let result = serde_json::json!({
1004            "content": [{"type": "image", "data": "abc"}],
1005            "isError": false
1006        });
1007        let output = extract_content_text(&result);
1008        assert!(output.contains("image"));
1009    }
1010
1011    #[test]
1012    fn test_parse_sse_jsonrpc_body_uses_last_jsonrpc_message() {
1013        let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1014        let parsed = parse_sse_jsonrpc_body(body).unwrap();
1015        assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1016    }
1017}