1use std::collections::BTreeMap;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use serde::Deserialize;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::process::{Child, ChildStdin, ChildStdout};
12use tokio::sync::Mutex;
13
14use crate::stdlib::json_to_vm_value;
15use crate::value::{VmError, VmValue};
16use crate::vm::Vm;
17
18const PROTOCOL_VERSION: &str = "2025-11-25";
20
21const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
23
24#[derive(Clone, Debug, Deserialize)]
25#[serde(rename_all = "lowercase")]
26enum McpTransport {
27 Stdio,
28 Http,
29}
30
31#[derive(Clone, Debug, Deserialize)]
32pub struct McpServerSpec {
33 pub name: String,
34 #[serde(default = "default_transport")]
35 transport: McpTransport,
36 #[serde(default)]
37 pub command: String,
38 #[serde(default)]
39 pub args: Vec<String>,
40 #[serde(default)]
41 pub env: BTreeMap<String, String>,
42 #[serde(default)]
43 pub url: String,
44 #[serde(default)]
45 pub auth_token: Option<String>,
46 #[serde(default)]
47 pub protocol_version: Option<String>,
48 #[serde(default)]
49 pub proxy_server_name: Option<String>,
50}
51
52fn default_transport() -> McpTransport {
53 McpTransport::Stdio
54}
55
56enum McpClientInner {
58 Stdio(StdioMcpClientInner),
59 Http(HttpMcpClientInner),
60}
61
62struct StdioMcpClientInner {
63 child: Child,
64 stdin: ChildStdin,
65 reader: BufReader<ChildStdout>,
66 next_id: u64,
67}
68
69struct HttpMcpClientInner {
70 client: reqwest::Client,
71 url: String,
72 auth_token: Option<String>,
73 protocol_version: String,
74 session_id: Option<String>,
75 next_id: u64,
76 proxy_server_name: Option<String>,
77}
78
79#[derive(Clone)]
81pub struct VmMcpClientHandle {
82 pub name: String,
83 inner: Arc<Mutex<Option<McpClientInner>>>,
84}
85
86impl std::fmt::Debug for VmMcpClientHandle {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 write!(f, "McpClient({})", self.name)
89 }
90}
91
92impl VmMcpClientHandle {
93 async fn call(
94 &self,
95 method: &str,
96 params: serde_json::Value,
97 ) -> Result<serde_json::Value, VmError> {
98 let mut guard = self.inner.lock().await;
99 let inner = guard
100 .as_mut()
101 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
102
103 match inner {
104 McpClientInner::Stdio(inner) => stdio_call(inner, method, params).await,
105 McpClientInner::Http(inner) => http_call(inner, method, params).await,
106 }
107 }
108
109 async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
110 let mut guard = self.inner.lock().await;
111 let inner = guard
112 .as_mut()
113 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
114
115 match inner {
116 McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
117 McpClientInner::Http(inner) => http_notify(inner, method, params).await,
118 }
119 }
120}
121
122async fn stdio_call(
123 inner: &mut StdioMcpClientInner,
124 method: &str,
125 params: serde_json::Value,
126) -> Result<serde_json::Value, VmError> {
127 let id = inner.next_id;
128 inner.next_id += 1;
129
130 let request = serde_json::json!({
131 "jsonrpc": "2.0",
132 "id": id,
133 "method": method,
134 "params": params,
135 });
136
137 let line = serde_json::to_string(&request)
138 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
139 inner
140 .stdin
141 .write_all(line.as_bytes())
142 .await
143 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
144 inner
145 .stdin
146 .write_all(b"\n")
147 .await
148 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
149 inner
150 .stdin
151 .flush()
152 .await
153 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
154
155 let mut line_buf = String::new();
156 loop {
157 line_buf.clear();
158 let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
159 .await
160 .map_err(|_| {
161 VmError::Runtime(format!(
162 "MCP: server did not respond to '{method}' within {}s",
163 MCP_TIMEOUT.as_secs()
164 ))
165 })?
166 .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
167
168 if bytes_read == 0 {
169 return Err(VmError::Runtime("MCP: server closed connection".into()));
170 }
171
172 let trimmed = line_buf.trim();
173 if trimmed.is_empty() {
174 continue;
175 }
176
177 let msg: serde_json::Value = match serde_json::from_str(trimmed) {
178 Ok(v) => v,
179 Err(_) => continue,
180 };
181
182 if msg.get("id").is_none() {
183 continue;
184 }
185
186 if msg["id"].as_u64() == Some(id) {
187 return parse_jsonrpc_result(msg);
188 }
189 }
190}
191
192async fn stdio_notify(
193 inner: &mut StdioMcpClientInner,
194 method: &str,
195 params: serde_json::Value,
196) -> Result<(), VmError> {
197 let notification = serde_json::json!({
198 "jsonrpc": "2.0",
199 "method": method,
200 "params": params,
201 });
202
203 let line = serde_json::to_string(¬ification)
204 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
205 inner
206 .stdin
207 .write_all(line.as_bytes())
208 .await
209 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
210 inner
211 .stdin
212 .write_all(b"\n")
213 .await
214 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
215 inner
216 .stdin
217 .flush()
218 .await
219 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
220 Ok(())
221}
222
223async fn http_call(
224 inner: &mut HttpMcpClientInner,
225 method: &str,
226 params: serde_json::Value,
227) -> Result<serde_json::Value, VmError> {
228 let id = inner.next_id;
229 inner.next_id += 1;
230 send_http_request(inner, method, params, Some(id)).await
231}
232
233async fn http_notify(
234 inner: &mut HttpMcpClientInner,
235 method: &str,
236 params: serde_json::Value,
237) -> Result<(), VmError> {
238 let _ = send_http_request(inner, method, params, None).await?;
239 Ok(())
240}
241
242async fn send_http_request(
243 inner: &mut HttpMcpClientInner,
244 method: &str,
245 params: serde_json::Value,
246 id: Option<u64>,
247) -> Result<serde_json::Value, VmError> {
248 for attempt in 0..2 {
249 let response = send_http_request_once(inner, method, params.clone(), id).await?;
250
251 let status = response.status().as_u16();
252 let headers = response.headers().clone();
253 if let Some(protocol_version) = headers
254 .get("MCP-Protocol-Version")
255 .and_then(|v| v.to_str().ok())
256 {
257 inner.protocol_version = protocol_version.to_string();
258 }
259 if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
260 inner.session_id = Some(session_id.to_string());
261 }
262
263 if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
264 inner.session_id = None;
265 reinitialize_http_client(inner).await?;
266 continue;
267 }
268
269 if status == 401 {
270 return Err(VmError::Thrown(VmValue::String(Rc::from(
271 "MCP authorization required",
272 ))));
273 }
274
275 let body = response
276 .text()
277 .await
278 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
279
280 if body.trim().is_empty() {
281 return Ok(serde_json::Value::Null);
282 }
283
284 let msg = parse_http_response_body(&body, status)?;
285
286 if status >= 400 {
287 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
288 }
289
290 if id.is_none() {
291 return Ok(msg);
292 }
293 return parse_jsonrpc_result(msg);
294 }
295
296 Err(VmError::Runtime("MCP HTTP request failed".into()))
297}
298
299async fn send_http_request_once(
300 inner: &mut HttpMcpClientInner,
301 method: &str,
302 params: serde_json::Value,
303 id: Option<u64>,
304) -> Result<reqwest::Response, VmError> {
305 let payload = if let Some(proxy_server_name) = &inner.proxy_server_name {
306 let mut body = serde_json::json!({
307 "serverName": proxy_server_name,
308 "jsonrpc": "2.0",
309 "method": method,
310 "params": params,
311 });
312 if let Some(id) = id {
313 body["id"] = serde_json::json!(id);
314 }
315 body
316 } else {
317 let mut body = serde_json::json!({
318 "jsonrpc": "2.0",
319 "method": method,
320 "params": params,
321 });
322 if let Some(id) = id {
323 body["id"] = serde_json::json!(id);
324 }
325 body
326 };
327
328 let mut request = inner
329 .client
330 .post(&inner.url)
331 .header("Content-Type", "application/json")
332 .header("Accept", "application/json, text/event-stream")
333 .header("MCP-Protocol-Version", &inner.protocol_version)
334 .json(&payload);
335
336 if let Some(token) = &inner.auth_token {
337 request = request.header("Authorization", format!("Bearer {token}"));
338 }
339 if let Some(session_id) = &inner.session_id {
340 request = request.header("MCP-Session-Id", session_id);
341 }
342
343 request
344 .send()
345 .await
346 .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
347}
348
349async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
350 let initialize = send_http_request_once(
351 inner,
352 "initialize",
353 serde_json::json!({
354 "protocolVersion": PROTOCOL_VERSION,
355 "capabilities": {},
356 "clientInfo": {
357 "name": "harn",
358 "version": env!("CARGO_PKG_VERSION"),
359 }
360 }),
361 Some(0),
362 )
363 .await?;
364 if let Some(protocol_version) = initialize
365 .headers()
366 .get("MCP-Protocol-Version")
367 .and_then(|v| v.to_str().ok())
368 {
369 inner.protocol_version = protocol_version.to_string();
370 }
371 if let Some(session_id) = initialize
372 .headers()
373 .get("MCP-Session-Id")
374 .and_then(|v| v.to_str().ok())
375 {
376 inner.session_id = Some(session_id.to_string());
377 }
378 let status = initialize.status().as_u16();
379 let body = initialize
380 .text()
381 .await
382 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
383 let msg = parse_http_response_body(&body, status)?;
384 if status >= 400 {
385 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
386 }
387 let _ = parse_jsonrpc_result(msg)?;
388 let response = send_http_request_once(
389 inner,
390 "notifications/initialized",
391 serde_json::json!({}),
392 None,
393 )
394 .await?;
395 let status = response.status().as_u16();
396 if let Some(protocol_version) = response
397 .headers()
398 .get("MCP-Protocol-Version")
399 .and_then(|v| v.to_str().ok())
400 {
401 inner.protocol_version = protocol_version.to_string();
402 }
403 if let Some(session_id) = response
404 .headers()
405 .get("MCP-Session-Id")
406 .and_then(|v| v.to_str().ok())
407 {
408 inner.session_id = Some(session_id.to_string());
409 }
410 let body = response
411 .text()
412 .await
413 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
414 if body.trim().is_empty() || status < 400 {
415 return Ok(());
416 }
417 let msg = parse_http_response_body(&body, status)?;
418 Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
419}
420
421fn parse_http_response_body(body: &str, status: u16) -> Result<serde_json::Value, VmError> {
422 if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
423 return parse_sse_jsonrpc_body(body);
424 }
425 serde_json::from_str(body).map_err(|e| {
426 VmError::Runtime(format!(
427 "MCP HTTP response parse error (status {status}): {e}"
428 ))
429 })
430}
431
432fn parse_sse_jsonrpc_body(body: &str) -> Result<serde_json::Value, VmError> {
433 let mut current_data = Vec::new();
434 let mut messages = Vec::new();
435
436 for line in body.lines() {
437 if line.is_empty() {
438 if !current_data.is_empty() {
439 messages.push(current_data.join("\n"));
440 current_data.clear();
441 }
442 continue;
443 }
444 if let Some(data) = line.strip_prefix("data:") {
445 current_data.push(data.trim_start().to_string());
446 }
447 }
448 if !current_data.is_empty() {
449 messages.push(current_data.join("\n"));
450 }
451
452 for message in messages.into_iter().rev() {
453 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
454 if value.get("result").is_some()
455 || value.get("error").is_some()
456 || value.get("method").is_some()
457 {
458 return Ok(value);
459 }
460 }
461 }
462
463 Err(VmError::Runtime(
464 "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
465 ))
466}
467
468fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
469 if let Some(error) = msg.get("error") {
470 return Err(jsonrpc_error_to_vm_error(error));
471 }
472 Ok(msg
473 .get("result")
474 .cloned()
475 .unwrap_or(serde_json::Value::Null))
476}
477
478fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
479 let message = error
480 .get("message")
481 .and_then(|v| v.as_str())
482 .unwrap_or("Unknown MCP error");
483 let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
484 VmError::Thrown(VmValue::String(Rc::from(format!(
485 "MCP error ({code}): {message}"
486 ))))
487}
488
489async fn mcp_connect_stdio_impl(
490 command: &str,
491 args: &[String],
492 env: &BTreeMap<String, String>,
493) -> Result<VmMcpClientHandle, VmError> {
494 let mut cmd = tokio::process::Command::new(command);
495 cmd.args(args)
496 .stdin(std::process::Stdio::piped())
497 .stdout(std::process::Stdio::piped())
498 .stderr(std::process::Stdio::inherit())
499 .envs(env);
500
501 let mut child = cmd.spawn().map_err(|e| {
502 VmError::Thrown(VmValue::String(Rc::from(format!(
503 "mcp_connect: failed to spawn '{command}': {e}"
504 ))))
505 })?;
506
507 let stdin = child
508 .stdin
509 .take()
510 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
511 let stdout = child
512 .stdout
513 .take()
514 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
515
516 let handle = VmMcpClientHandle {
517 name: command.to_string(),
518 inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
519 StdioMcpClientInner {
520 child,
521 stdin,
522 reader: BufReader::new(stdout),
523 next_id: 1,
524 },
525 )))),
526 };
527
528 initialize_client(&handle).await?;
529 Ok(handle)
530}
531
532async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
533 let client = reqwest::Client::builder()
534 .timeout(MCP_TIMEOUT)
535 .build()
536 .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
537
538 let handle = VmMcpClientHandle {
539 name: spec.name.clone(),
540 inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
541 client,
542 url: spec.url.clone(),
543 auth_token: spec.auth_token.clone(),
544 protocol_version: spec
545 .protocol_version
546 .clone()
547 .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
548 session_id: None,
549 next_id: 1,
550 proxy_server_name: spec.proxy_server_name.clone(),
551 })))),
552 };
553
554 initialize_client(&handle).await?;
555 Ok(handle)
556}
557
558async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
559 handle
560 .call(
561 "initialize",
562 serde_json::json!({
563 "protocolVersion": PROTOCOL_VERSION,
564 "capabilities": {},
565 "clientInfo": {
566 "name": "harn",
567 "version": env!("CARGO_PKG_VERSION"),
568 }
569 }),
570 )
571 .await?;
572
573 handle
574 .notify("notifications/initialized", serde_json::json!({}))
575 .await?;
576
577 Ok(())
578}
579
580pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
581 match val {
582 VmValue::String(s) => serde_json::Value::String(s.to_string()),
583 VmValue::Int(n) => serde_json::json!(*n),
584 VmValue::Float(n) => serde_json::json!(*n),
585 VmValue::Bool(b) => serde_json::Value::Bool(*b),
586 VmValue::Nil => serde_json::Value::Null,
587 VmValue::List(items) => {
588 serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
589 }
590 VmValue::Dict(map) => {
591 let obj: serde_json::Map<String, serde_json::Value> = map
592 .iter()
593 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
594 .collect();
595 serde_json::Value::Object(obj)
596 }
597 _ => serde_json::Value::Null,
598 }
599}
600
601fn extract_content_text(result: &serde_json::Value) -> String {
602 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
603 let texts: Vec<&str> = content
604 .iter()
605 .filter_map(|item| {
606 if item.get("type").and_then(|t| t.as_str()) == Some("text") {
607 item.get("text").and_then(|t| t.as_str())
608 } else {
609 None
610 }
611 })
612 .collect();
613 if texts.is_empty() {
614 json_to_vm_value(result).display()
615 } else {
616 texts.join("\n")
617 }
618 } else {
619 json_to_vm_value(result).display()
620 }
621}
622
623pub async fn connect_mcp_server(
624 name: &str,
625 command: &str,
626 args: &[String],
627) -> Result<VmMcpClientHandle, VmError> {
628 let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
629 handle.name = name.to_string();
630 Ok(handle)
631}
632
633pub async fn connect_mcp_server_from_spec(
634 spec: &McpServerSpec,
635) -> Result<VmMcpClientHandle, VmError> {
636 let mut handle = match spec.transport {
637 McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
638 McpTransport::Http => mcp_connect_http_impl(spec).await?,
639 };
640 handle.name = spec.name.clone();
641 Ok(handle)
642}
643
644pub async fn connect_mcp_server_from_json(
645 value: &serde_json::Value,
646) -> Result<VmMcpClientHandle, VmError> {
647 let spec: McpServerSpec = serde_json::from_value(value.clone())
648 .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
649 connect_mcp_server_from_spec(&spec).await
650}
651
652pub fn register_mcp_builtins(vm: &mut Vm) {
653 vm.register_async_builtin("mcp_connect", |args| async move {
654 let command = args.first().map(|a| a.display()).unwrap_or_default();
655 if command.is_empty() {
656 return Err(VmError::Thrown(VmValue::String(Rc::from(
657 "mcp_connect: command is required",
658 ))));
659 }
660
661 let cmd_args: Vec<String> = match args.get(1) {
662 Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
663 _ => Vec::new(),
664 };
665
666 let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
667 Ok(VmValue::McpClient(handle))
668 });
669
670 vm.register_async_builtin("mcp_list_tools", |args| async move {
671 let client = match args.first() {
672 Some(VmValue::McpClient(c)) => c.clone(),
673 _ => {
674 return Err(VmError::Thrown(VmValue::String(Rc::from(
675 "mcp_list_tools: argument must be an MCP client",
676 ))));
677 }
678 };
679
680 let result = client.call("tools/list", serde_json::json!({})).await?;
681 let tools = result
682 .get("tools")
683 .and_then(|t| t.as_array())
684 .cloned()
685 .unwrap_or_default();
686
687 let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
688 Ok(VmValue::List(Rc::new(vm_tools)))
689 });
690
691 vm.register_async_builtin("mcp_call", |args| async move {
692 let client = match args.first() {
693 Some(VmValue::McpClient(c)) => c.clone(),
694 _ => {
695 return Err(VmError::Thrown(VmValue::String(Rc::from(
696 "mcp_call: first argument must be an MCP client",
697 ))));
698 }
699 };
700
701 let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
702 if tool_name.is_empty() {
703 return Err(VmError::Thrown(VmValue::String(Rc::from(
704 "mcp_call: tool name is required",
705 ))));
706 }
707
708 let arguments = match args.get(2) {
709 Some(VmValue::Dict(d)) => {
710 let obj: serde_json::Map<String, serde_json::Value> = d
711 .iter()
712 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
713 .collect();
714 serde_json::Value::Object(obj)
715 }
716 _ => serde_json::json!({}),
717 };
718
719 let result = client
720 .call(
721 "tools/call",
722 serde_json::json!({
723 "name": tool_name,
724 "arguments": arguments,
725 }),
726 )
727 .await?;
728
729 if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
730 let error_text = extract_content_text(&result);
731 return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
732 }
733
734 let content = result
735 .get("content")
736 .and_then(|c| c.as_array())
737 .cloned()
738 .unwrap_or_default();
739
740 if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
741 if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
742 return Ok(VmValue::String(Rc::from(text)));
743 }
744 }
745
746 if content.is_empty() {
747 Ok(VmValue::Nil)
748 } else {
749 Ok(VmValue::List(Rc::new(
750 content.iter().map(json_to_vm_value).collect(),
751 )))
752 }
753 });
754
755 vm.register_async_builtin("mcp_server_info", |args| async move {
756 let client = match args.first() {
757 Some(VmValue::McpClient(c)) => c.clone(),
758 _ => {
759 return Err(VmError::Thrown(VmValue::String(Rc::from(
760 "mcp_server_info: argument must be an MCP client",
761 ))));
762 }
763 };
764
765 let guard = client.inner.lock().await;
766 if guard.is_none() {
767 return Err(VmError::Runtime("MCP client is disconnected".into()));
768 }
769 drop(guard);
770
771 let mut info = BTreeMap::new();
772 info.insert(
773 "name".to_string(),
774 VmValue::String(Rc::from(client.name.as_str())),
775 );
776 info.insert("connected".to_string(), VmValue::Bool(true));
777 Ok(VmValue::Dict(Rc::new(info)))
778 });
779
780 vm.register_async_builtin("mcp_disconnect", |args| async move {
781 let client = match args.first() {
782 Some(VmValue::McpClient(c)) => c.clone(),
783 _ => {
784 return Err(VmError::Thrown(VmValue::String(Rc::from(
785 "mcp_disconnect: argument must be an MCP client",
786 ))));
787 }
788 };
789
790 let mut guard = client.inner.lock().await;
791 if let Some(inner) = guard.take() {
792 match inner {
793 McpClientInner::Stdio(mut inner) => {
794 let _ = inner.child.kill().await;
795 }
796 McpClientInner::Http(_) => {}
797 }
798 }
799 Ok(VmValue::Nil)
800 });
801
802 vm.register_async_builtin("mcp_list_resources", |args| async move {
803 let client = match args.first() {
804 Some(VmValue::McpClient(c)) => c.clone(),
805 _ => {
806 return Err(VmError::Thrown(VmValue::String(Rc::from(
807 "mcp_list_resources: argument must be an MCP client",
808 ))));
809 }
810 };
811
812 let result = client.call("resources/list", serde_json::json!({})).await?;
813 let resources = result
814 .get("resources")
815 .and_then(|r| r.as_array())
816 .cloned()
817 .unwrap_or_default();
818
819 let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
820 Ok(VmValue::List(Rc::new(vm_resources)))
821 });
822
823 vm.register_async_builtin("mcp_read_resource", |args| async move {
824 let client = match args.first() {
825 Some(VmValue::McpClient(c)) => c.clone(),
826 _ => {
827 return Err(VmError::Thrown(VmValue::String(Rc::from(
828 "mcp_read_resource: first argument must be an MCP client",
829 ))));
830 }
831 };
832
833 let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
834 if uri.is_empty() {
835 return Err(VmError::Thrown(VmValue::String(Rc::from(
836 "mcp_read_resource: URI is required",
837 ))));
838 }
839
840 let result = client
841 .call("resources/read", serde_json::json!({ "uri": uri }))
842 .await?;
843
844 let contents = result
845 .get("contents")
846 .and_then(|c| c.as_array())
847 .cloned()
848 .unwrap_or_default();
849
850 if contents.len() == 1 {
851 if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
852 return Ok(VmValue::String(Rc::from(text)));
853 }
854 }
855
856 if contents.is_empty() {
857 Ok(VmValue::Nil)
858 } else {
859 Ok(VmValue::List(Rc::new(
860 contents.iter().map(json_to_vm_value).collect(),
861 )))
862 }
863 });
864
865 vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
866 let client = match args.first() {
867 Some(VmValue::McpClient(c)) => c.clone(),
868 _ => {
869 return Err(VmError::Thrown(VmValue::String(Rc::from(
870 "mcp_list_resource_templates: argument must be an MCP client",
871 ))));
872 }
873 };
874
875 let result = client
876 .call("resources/templates/list", serde_json::json!({}))
877 .await?;
878
879 let templates = result
880 .get("resourceTemplates")
881 .and_then(|r| r.as_array())
882 .cloned()
883 .unwrap_or_default();
884
885 let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
886 Ok(VmValue::List(Rc::new(vm_templates)))
887 });
888
889 vm.register_async_builtin("mcp_list_prompts", |args| async move {
890 let client = match args.first() {
891 Some(VmValue::McpClient(c)) => c.clone(),
892 _ => {
893 return Err(VmError::Thrown(VmValue::String(Rc::from(
894 "mcp_list_prompts: argument must be an MCP client",
895 ))));
896 }
897 };
898
899 let result = client.call("prompts/list", serde_json::json!({})).await?;
900
901 let prompts = result
902 .get("prompts")
903 .and_then(|p| p.as_array())
904 .cloned()
905 .unwrap_or_default();
906
907 let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
908 Ok(VmValue::List(Rc::new(vm_prompts)))
909 });
910
911 vm.register_async_builtin("mcp_get_prompt", |args| async move {
912 let client = match args.first() {
913 Some(VmValue::McpClient(c)) => c.clone(),
914 _ => {
915 return Err(VmError::Thrown(VmValue::String(Rc::from(
916 "mcp_get_prompt: first argument must be an MCP client",
917 ))));
918 }
919 };
920
921 let name = args.get(1).map(|a| a.display()).unwrap_or_default();
922 if name.is_empty() {
923 return Err(VmError::Thrown(VmValue::String(Rc::from(
924 "mcp_get_prompt: prompt name is required",
925 ))));
926 }
927
928 let arguments = match args.get(2) {
929 Some(VmValue::Dict(d)) => {
930 let obj: serde_json::Map<String, serde_json::Value> = d
931 .iter()
932 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
933 .collect();
934 serde_json::Value::Object(obj)
935 }
936 _ => serde_json::json!({}),
937 };
938
939 let result = client
940 .call(
941 "prompts/get",
942 serde_json::json!({
943 "name": name,
944 "arguments": arguments,
945 }),
946 )
947 .await?;
948
949 Ok(json_to_vm_value(&result))
950 });
951}
952
953#[cfg(test)]
954mod tests {
955 use super::*;
956
957 #[test]
958 fn test_vm_value_to_serde_string() {
959 let val = VmValue::String(Rc::from("hello"));
960 let json = vm_value_to_serde(&val);
961 assert_eq!(json, serde_json::json!("hello"));
962 }
963
964 #[test]
965 fn test_vm_value_to_serde_dict() {
966 let mut map = BTreeMap::new();
967 map.insert("key".to_string(), VmValue::Int(42));
968 let val = VmValue::Dict(Rc::new(map));
969 let json = vm_value_to_serde(&val);
970 assert_eq!(json, serde_json::json!({"key": 42}));
971 }
972
973 #[test]
974 fn test_vm_value_to_serde_list() {
975 let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
976 let json = vm_value_to_serde(&val);
977 assert_eq!(json, serde_json::json!([1, 2]));
978 }
979
980 #[test]
981 fn test_extract_content_text_single() {
982 let result = serde_json::json!({
983 "content": [{"type": "text", "text": "hello world"}],
984 "isError": false
985 });
986 assert_eq!(extract_content_text(&result), "hello world");
987 }
988
989 #[test]
990 fn test_extract_content_text_multiple() {
991 let result = serde_json::json!({
992 "content": [
993 {"type": "text", "text": "first"},
994 {"type": "text", "text": "second"}
995 ],
996 "isError": false
997 });
998 assert_eq!(extract_content_text(&result), "first\nsecond");
999 }
1000
1001 #[test]
1002 fn test_extract_content_text_fallback_json() {
1003 let result = serde_json::json!({
1004 "content": [{"type": "image", "data": "abc"}],
1005 "isError": false
1006 });
1007 let output = extract_content_text(&result);
1008 assert!(output.contains("image"));
1009 }
1010
1011 #[test]
1012 fn test_parse_sse_jsonrpc_body_uses_last_jsonrpc_message() {
1013 let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1014 let parsed = parse_sse_jsonrpc_body(body).unwrap();
1015 assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1016 }
1017}