1use std::collections::BTreeMap;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use serde::Deserialize;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::process::{Child, ChildStdin, ChildStdout};
12use tokio::sync::Mutex;
13
14use crate::stdlib::json_to_vm_value;
15use crate::value::{VmError, VmValue};
16use crate::vm::Vm;
17
18const PROTOCOL_VERSION: &str = "2025-11-25";
20
21const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
23
24#[derive(Clone, Debug, Deserialize)]
25#[serde(rename_all = "lowercase")]
26enum McpTransport {
27 Stdio,
28 Http,
29}
30
31#[derive(Clone, Debug, Deserialize)]
32pub struct McpServerSpec {
33 pub name: String,
34 #[serde(default = "default_transport")]
35 transport: McpTransport,
36 #[serde(default)]
37 pub command: String,
38 #[serde(default)]
39 pub args: Vec<String>,
40 #[serde(default)]
41 pub env: BTreeMap<String, String>,
42 #[serde(default)]
43 pub url: String,
44 #[serde(default)]
45 pub auth_token: Option<String>,
46 #[serde(default)]
47 pub protocol_version: Option<String>,
48 #[serde(default)]
49 pub proxy_server_name: Option<String>,
50}
51
52fn default_transport() -> McpTransport {
53 McpTransport::Stdio
54}
55
56enum McpClientInner {
58 Stdio(StdioMcpClientInner),
59 Http(HttpMcpClientInner),
60}
61
62struct StdioMcpClientInner {
63 child: Child,
64 stdin: ChildStdin,
65 reader: BufReader<ChildStdout>,
66 next_id: u64,
67}
68
69struct HttpMcpClientInner {
70 client: reqwest::Client,
71 url: String,
72 auth_token: Option<String>,
73 protocol_version: String,
74 session_id: Option<String>,
75 next_id: u64,
76 proxy_server_name: Option<String>,
77}
78
79#[derive(Clone)]
81pub struct VmMcpClientHandle {
82 pub name: String,
83 inner: Arc<Mutex<Option<McpClientInner>>>,
84}
85
86impl std::fmt::Debug for VmMcpClientHandle {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 write!(f, "McpClient({})", self.name)
89 }
90}
91
92impl VmMcpClientHandle {
93 async fn call(
94 &self,
95 method: &str,
96 params: serde_json::Value,
97 ) -> Result<serde_json::Value, VmError> {
98 let mut guard = self.inner.lock().await;
99 let inner = guard
100 .as_mut()
101 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
102
103 match inner {
104 McpClientInner::Stdio(inner) => stdio_call(inner, method, params).await,
105 McpClientInner::Http(inner) => http_call(inner, method, params).await,
106 }
107 }
108
109 async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
110 let mut guard = self.inner.lock().await;
111 let inner = guard
112 .as_mut()
113 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
114
115 match inner {
116 McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
117 McpClientInner::Http(inner) => http_notify(inner, method, params).await,
118 }
119 }
120}
121
122async fn stdio_call(
123 inner: &mut StdioMcpClientInner,
124 method: &str,
125 params: serde_json::Value,
126) -> Result<serde_json::Value, VmError> {
127 let id = inner.next_id;
128 inner.next_id += 1;
129
130 let request = serde_json::json!({
131 "jsonrpc": "2.0",
132 "id": id,
133 "method": method,
134 "params": params,
135 });
136
137 let line = serde_json::to_string(&request)
138 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
139 inner
140 .stdin
141 .write_all(line.as_bytes())
142 .await
143 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
144 inner
145 .stdin
146 .write_all(b"\n")
147 .await
148 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
149 inner
150 .stdin
151 .flush()
152 .await
153 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
154
155 let mut line_buf = String::new();
156 loop {
157 line_buf.clear();
158 let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
159 .await
160 .map_err(|_| {
161 VmError::Runtime(format!(
162 "MCP: server did not respond to '{method}' within {}s",
163 MCP_TIMEOUT.as_secs()
164 ))
165 })?
166 .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
167
168 if bytes_read == 0 {
169 return Err(VmError::Runtime("MCP: server closed connection".into()));
170 }
171
172 let trimmed = line_buf.trim();
173 if trimmed.is_empty() {
174 continue;
175 }
176
177 let msg: serde_json::Value = match serde_json::from_str(trimmed) {
178 Ok(v) => v,
179 Err(_) => continue,
180 };
181
182 if msg.get("id").is_none() {
183 continue;
184 }
185
186 if msg["id"].as_u64() == Some(id) {
187 return parse_jsonrpc_result(msg);
188 }
189 }
190}
191
192async fn stdio_notify(
193 inner: &mut StdioMcpClientInner,
194 method: &str,
195 params: serde_json::Value,
196) -> Result<(), VmError> {
197 let notification = serde_json::json!({
198 "jsonrpc": "2.0",
199 "method": method,
200 "params": params,
201 });
202
203 let line = serde_json::to_string(¬ification)
204 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
205 inner
206 .stdin
207 .write_all(line.as_bytes())
208 .await
209 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
210 inner
211 .stdin
212 .write_all(b"\n")
213 .await
214 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
215 inner
216 .stdin
217 .flush()
218 .await
219 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
220 Ok(())
221}
222
223async fn http_call(
224 inner: &mut HttpMcpClientInner,
225 method: &str,
226 params: serde_json::Value,
227) -> Result<serde_json::Value, VmError> {
228 let id = inner.next_id;
229 inner.next_id += 1;
230 send_http_request(inner, method, params, Some(id)).await
231}
232
233async fn http_notify(
234 inner: &mut HttpMcpClientInner,
235 method: &str,
236 params: serde_json::Value,
237) -> Result<(), VmError> {
238 let _ = send_http_request(inner, method, params, None).await?;
239 Ok(())
240}
241
242async fn send_http_request(
243 inner: &mut HttpMcpClientInner,
244 method: &str,
245 params: serde_json::Value,
246 id: Option<u64>,
247) -> Result<serde_json::Value, VmError> {
248 for attempt in 0..2 {
249 let response = send_http_request_once(inner, method, params.clone(), id).await?;
250
251 let status = response.status().as_u16();
252 let headers = response.headers().clone();
253 if let Some(protocol_version) = headers
254 .get("MCP-Protocol-Version")
255 .and_then(|v| v.to_str().ok())
256 {
257 inner.protocol_version = protocol_version.to_string();
258 }
259 if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
260 inner.session_id = Some(session_id.to_string());
261 }
262
263 if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
264 inner.session_id = None;
265 reinitialize_http_client(inner).await?;
266 continue;
267 }
268
269 if status == 401 {
270 return Err(VmError::Thrown(VmValue::String(Rc::from(
271 "MCP authorization required",
272 ))));
273 }
274
275 let body = response
276 .text()
277 .await
278 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
279
280 if body.trim().is_empty() {
281 return Ok(serde_json::Value::Null);
282 }
283
284 let msg = parse_http_response_body(&body, status)?;
285
286 if status >= 400 {
287 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
288 }
289
290 if id.is_none() {
291 return Ok(msg);
292 }
293 return parse_jsonrpc_result(msg);
294 }
295
296 Err(VmError::Runtime("MCP HTTP request failed".into()))
297}
298
299async fn send_http_request_once(
300 inner: &mut HttpMcpClientInner,
301 method: &str,
302 params: serde_json::Value,
303 id: Option<u64>,
304) -> Result<reqwest::Response, VmError> {
305 let payload = if let Some(proxy_server_name) = &inner.proxy_server_name {
306 let mut body = serde_json::json!({
307 "serverName": proxy_server_name,
308 "jsonrpc": "2.0",
309 "method": method,
310 "params": params,
311 });
312 if let Some(id) = id {
313 body["id"] = serde_json::json!(id);
314 }
315 body
316 } else {
317 let mut body = serde_json::json!({
318 "jsonrpc": "2.0",
319 "method": method,
320 "params": params,
321 });
322 if let Some(id) = id {
323 body["id"] = serde_json::json!(id);
324 }
325 body
326 };
327
328 let mut request = inner
329 .client
330 .post(&inner.url)
331 .header("Content-Type", "application/json")
332 .header("Accept", "application/json, text/event-stream")
333 .header("MCP-Protocol-Version", &inner.protocol_version)
334 .json(&payload);
335
336 if let Some(token) = &inner.auth_token {
337 request = request.header("Authorization", format!("Bearer {token}"));
338 }
339 if let Some(session_id) = &inner.session_id {
340 request = request.header("MCP-Session-Id", session_id);
341 }
342
343 request
344 .send()
345 .await
346 .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
347}
348
349async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
350 let initialize = send_http_request_once(
351 inner,
352 "initialize",
353 serde_json::json!({
354 "protocolVersion": PROTOCOL_VERSION,
355 "capabilities": {},
356 "clientInfo": {
357 "name": "harn",
358 "version": env!("CARGO_PKG_VERSION"),
359 }
360 }),
361 Some(0),
362 )
363 .await?;
364 if let Some(protocol_version) = initialize
365 .headers()
366 .get("MCP-Protocol-Version")
367 .and_then(|v| v.to_str().ok())
368 {
369 inner.protocol_version = protocol_version.to_string();
370 }
371 if let Some(session_id) = initialize
372 .headers()
373 .get("MCP-Session-Id")
374 .and_then(|v| v.to_str().ok())
375 {
376 inner.session_id = Some(session_id.to_string());
377 }
378 let status = initialize.status().as_u16();
379 let body = initialize
380 .text()
381 .await
382 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
383 let msg = parse_http_response_body(&body, status)?;
384 if status >= 400 {
385 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
386 }
387 let _ = parse_jsonrpc_result(msg)?;
388 let response = send_http_request_once(
389 inner,
390 "notifications/initialized",
391 serde_json::json!({}),
392 None,
393 )
394 .await?;
395 let status = response.status().as_u16();
396 if let Some(protocol_version) = response
397 .headers()
398 .get("MCP-Protocol-Version")
399 .and_then(|v| v.to_str().ok())
400 {
401 inner.protocol_version = protocol_version.to_string();
402 }
403 if let Some(session_id) = response
404 .headers()
405 .get("MCP-Session-Id")
406 .and_then(|v| v.to_str().ok())
407 {
408 inner.session_id = Some(session_id.to_string());
409 }
410 let body = response
411 .text()
412 .await
413 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
414 if body.trim().is_empty() || status < 400 {
415 return Ok(());
416 }
417 let msg = parse_http_response_body(&body, status)?;
418 Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
419}
420
421fn parse_http_response_body(body: &str, status: u16) -> Result<serde_json::Value, VmError> {
422 if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
423 return parse_sse_jsonrpc_body(body);
424 }
425 serde_json::from_str(body).map_err(|e| {
426 VmError::Runtime(format!(
427 "MCP HTTP response parse error (status {status}): {e}"
428 ))
429 })
430}
431
432fn parse_sse_jsonrpc_body(body: &str) -> Result<serde_json::Value, VmError> {
433 let mut current_data = Vec::new();
434 let mut messages = Vec::new();
435
436 for line in body.lines() {
437 if line.is_empty() {
438 if !current_data.is_empty() {
439 messages.push(current_data.join("\n"));
440 current_data.clear();
441 }
442 continue;
443 }
444 if let Some(data) = line.strip_prefix("data:") {
445 current_data.push(data.trim_start().to_string());
446 }
447 }
448 if !current_data.is_empty() {
449 messages.push(current_data.join("\n"));
450 }
451
452 for message in messages.into_iter().rev() {
453 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
454 if value.get("result").is_some()
455 || value.get("error").is_some()
456 || value.get("method").is_some()
457 {
458 return Ok(value);
459 }
460 }
461 }
462
463 Err(VmError::Runtime(
464 "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
465 ))
466}
467
468fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
469 if let Some(error) = msg.get("error") {
470 return Err(jsonrpc_error_to_vm_error(error));
471 }
472 Ok(msg
473 .get("result")
474 .cloned()
475 .unwrap_or(serde_json::Value::Null))
476}
477
478fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
479 let message = error
480 .get("message")
481 .and_then(|v| v.as_str())
482 .unwrap_or("Unknown MCP error");
483 let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
484 VmError::Thrown(VmValue::String(Rc::from(format!(
485 "MCP error ({code}): {message}"
486 ))))
487}
488
489async fn mcp_connect_stdio_impl(
490 command: &str,
491 args: &[String],
492 env: &BTreeMap<String, String>,
493) -> Result<VmMcpClientHandle, VmError> {
494 let mut cmd = tokio::process::Command::new(command);
495 cmd.args(args)
496 .stdin(std::process::Stdio::piped())
497 .stdout(std::process::Stdio::piped())
498 .stderr(std::process::Stdio::inherit())
499 .envs(env);
500
501 let mut child = cmd.spawn().map_err(|e| {
502 VmError::Thrown(VmValue::String(Rc::from(format!(
503 "mcp_connect: failed to spawn '{command}': {e}"
504 ))))
505 })?;
506
507 let stdin = child
508 .stdin
509 .take()
510 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
511 let stdout = child
512 .stdout
513 .take()
514 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
515
516 let handle = VmMcpClientHandle {
517 name: command.to_string(),
518 inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
519 StdioMcpClientInner {
520 child,
521 stdin,
522 reader: BufReader::new(stdout),
523 next_id: 1,
524 },
525 )))),
526 };
527
528 initialize_client(&handle).await?;
529 Ok(handle)
530}
531
532async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
533 let client = reqwest::Client::builder()
534 .timeout(MCP_TIMEOUT)
535 .build()
536 .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
537
538 let handle = VmMcpClientHandle {
539 name: spec.name.clone(),
540 inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
541 client,
542 url: spec.url.clone(),
543 auth_token: spec.auth_token.clone(),
544 protocol_version: spec
545 .protocol_version
546 .clone()
547 .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
548 session_id: None,
549 next_id: 1,
550 proxy_server_name: spec.proxy_server_name.clone(),
551 })))),
552 };
553
554 initialize_client(&handle).await?;
555 Ok(handle)
556}
557
558async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
559 handle
560 .call(
561 "initialize",
562 serde_json::json!({
563 "protocolVersion": PROTOCOL_VERSION,
564 "capabilities": {},
565 "clientInfo": {
566 "name": "harn",
567 "version": env!("CARGO_PKG_VERSION"),
568 }
569 }),
570 )
571 .await?;
572
573 handle
574 .notify("notifications/initialized", serde_json::json!({}))
575 .await?;
576
577 Ok(())
578}
579
580pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
581 match val {
582 VmValue::String(s) => serde_json::Value::String(s.to_string()),
583 VmValue::Int(n) => serde_json::json!(*n),
584 VmValue::Float(n) => serde_json::json!(*n),
585 VmValue::Bool(b) => serde_json::Value::Bool(*b),
586 VmValue::Nil => serde_json::Value::Null,
587 VmValue::List(items) => {
588 serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
589 }
590 VmValue::Dict(map) => {
591 let obj: serde_json::Map<String, serde_json::Value> = map
592 .iter()
593 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
594 .collect();
595 serde_json::Value::Object(obj)
596 }
597 _ => serde_json::Value::Null,
598 }
599}
600
601fn extract_content_text(result: &serde_json::Value) -> String {
602 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
603 let texts: Vec<&str> = content
604 .iter()
605 .filter_map(|item| {
606 if item.get("type").and_then(|t| t.as_str()) == Some("text") {
607 item.get("text").and_then(|t| t.as_str())
608 } else {
609 None
610 }
611 })
612 .collect();
613 if texts.is_empty() {
614 json_to_vm_value(result).display()
615 } else {
616 texts.join("\n")
617 }
618 } else {
619 json_to_vm_value(result).display()
620 }
621}
622
623pub async fn connect_mcp_server(
624 name: &str,
625 command: &str,
626 args: &[String],
627) -> Result<VmMcpClientHandle, VmError> {
628 let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
629 handle.name = name.to_string();
630 Ok(handle)
631}
632
633pub async fn connect_mcp_server_from_spec(
634 spec: &McpServerSpec,
635) -> Result<VmMcpClientHandle, VmError> {
636 let mut handle = match spec.transport {
637 McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
638 McpTransport::Http => mcp_connect_http_impl(spec).await?,
639 };
640 handle.name = spec.name.clone();
641 Ok(handle)
642}
643
644pub async fn connect_mcp_server_from_json(
645 value: &serde_json::Value,
646) -> Result<VmMcpClientHandle, VmError> {
647 let spec: McpServerSpec = serde_json::from_value(value.clone())
648 .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
649 connect_mcp_server_from_spec(&spec).await
650}
651
652pub fn register_mcp_builtins(vm: &mut Vm) {
653 vm.register_async_builtin("mcp_connect", |args| async move {
654 let command = args.first().map(|a| a.display()).unwrap_or_default();
655 if command.is_empty() {
656 return Err(VmError::Thrown(VmValue::String(Rc::from(
657 "mcp_connect: command is required",
658 ))));
659 }
660
661 let cmd_args: Vec<String> = match args.get(1) {
662 Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
663 _ => Vec::new(),
664 };
665
666 let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
667 Ok(VmValue::McpClient(handle))
668 });
669
670 vm.register_async_builtin("mcp_ensure_active", |args| async move {
674 let name = match args.first() {
675 Some(VmValue::String(s)) => s.to_string(),
676 Some(other) => other.display(),
677 None => String::new(),
678 };
679 if name.is_empty() {
680 return Err(VmError::Thrown(VmValue::String(Rc::from(
681 "mcp_ensure_active: server name is required",
682 ))));
683 }
684 let handle = crate::mcp_registry::ensure_active(&name).await?;
685 Ok(VmValue::McpClient(handle))
686 });
687
688 vm.register_builtin("mcp_release", |args, _out| {
692 let name = match args.first() {
693 Some(VmValue::String(s)) => s.to_string(),
694 Some(other) => other.display(),
695 None => {
696 return Err(VmError::Thrown(VmValue::String(Rc::from(
697 "mcp_release: server name is required",
698 ))));
699 }
700 };
701 crate::mcp_registry::release(&name);
702 Ok(VmValue::Nil)
703 });
704
705 vm.register_builtin("mcp_registry_status", |_args, _out| {
709 let mut out = Vec::new();
710 for entry in crate::mcp_registry::snapshot_status() {
711 let mut dict = BTreeMap::new();
712 dict.insert(
713 "name".to_string(),
714 VmValue::String(Rc::from(entry.name.as_str())),
715 );
716 dict.insert("lazy".to_string(), VmValue::Bool(entry.lazy));
717 dict.insert("active".to_string(), VmValue::Bool(entry.active));
718 dict.insert(
719 "ref_count".to_string(),
720 VmValue::Int(entry.ref_count as i64),
721 );
722 if let Some(card) = entry.card {
723 dict.insert("card".to_string(), VmValue::String(Rc::from(card.as_str())));
724 }
725 out.push(VmValue::Dict(Rc::new(dict)));
726 }
727 Ok(VmValue::List(Rc::new(out)))
728 });
729
730 vm.register_async_builtin("mcp_server_card", |args| async move {
737 let target = match args.first() {
738 Some(VmValue::String(s)) => s.to_string(),
739 Some(other) => other.display(),
740 None => {
741 return Err(VmError::Thrown(VmValue::String(Rc::from(
742 "mcp_server_card: server name, URL, or path is required",
743 ))));
744 }
745 };
746
747 let source = if target.starts_with("http://")
753 || target.starts_with("https://")
754 || target.contains('/')
755 || target.contains('\\')
756 || target.ends_with(".json")
757 {
758 target.clone()
759 } else {
760 match crate::mcp_registry::get_registration(&target) {
761 Some(reg) => match reg.card {
762 Some(card) => card,
763 None => {
764 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
765 "mcp_server_card: server '{target}' has no 'card' field in harn.toml"
766 )))));
767 }
768 },
769 None => {
770 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
771 "mcp_server_card: no MCP server '{target}' registered (check harn.toml) \
772 — pass a URL or path directly instead"
773 )))));
774 }
775 }
776 };
777
778 let card = crate::mcp_card::fetch_server_card(&source, None)
779 .await
780 .map_err(|e| {
781 VmError::Thrown(VmValue::String(Rc::from(format!("mcp_server_card: {e}"))))
782 })?;
783 Ok(json_to_vm_value(&card))
784 });
785
786 vm.register_async_builtin("mcp_list_tools", |args| async move {
787 let client = match args.first() {
788 Some(VmValue::McpClient(c)) => c.clone(),
789 _ => {
790 return Err(VmError::Thrown(VmValue::String(Rc::from(
791 "mcp_list_tools: argument must be an MCP client",
792 ))));
793 }
794 };
795
796 let result = client.call("tools/list", serde_json::json!({})).await?;
797 let mut tools = result
798 .get("tools")
799 .and_then(|t| t.as_array())
800 .cloned()
801 .unwrap_or_default();
802
803 let server_name = client.name.clone();
808 for tool in tools.iter_mut() {
809 if let Some(obj) = tool.as_object_mut() {
810 obj.entry("_mcp_server")
811 .or_insert_with(|| serde_json::Value::String(server_name.clone()));
812 }
813 }
814
815 let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
816 Ok(VmValue::List(Rc::new(vm_tools)))
817 });
818
819 vm.register_async_builtin("mcp_call", |args| async move {
820 let client = match args.first() {
821 Some(VmValue::McpClient(c)) => c.clone(),
822 _ => {
823 return Err(VmError::Thrown(VmValue::String(Rc::from(
824 "mcp_call: first argument must be an MCP client",
825 ))));
826 }
827 };
828
829 let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
830 if tool_name.is_empty() {
831 return Err(VmError::Thrown(VmValue::String(Rc::from(
832 "mcp_call: tool name is required",
833 ))));
834 }
835
836 let arguments = match args.get(2) {
837 Some(VmValue::Dict(d)) => {
838 let obj: serde_json::Map<String, serde_json::Value> = d
839 .iter()
840 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
841 .collect();
842 serde_json::Value::Object(obj)
843 }
844 _ => serde_json::json!({}),
845 };
846
847 let result = client
848 .call(
849 "tools/call",
850 serde_json::json!({
851 "name": tool_name,
852 "arguments": arguments,
853 }),
854 )
855 .await?;
856
857 if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
858 let error_text = extract_content_text(&result);
859 return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
860 }
861
862 let content = result
863 .get("content")
864 .and_then(|c| c.as_array())
865 .cloned()
866 .unwrap_or_default();
867
868 if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
869 if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
870 return Ok(VmValue::String(Rc::from(text)));
871 }
872 }
873
874 if content.is_empty() {
875 Ok(VmValue::Nil)
876 } else {
877 Ok(VmValue::List(Rc::new(
878 content.iter().map(json_to_vm_value).collect(),
879 )))
880 }
881 });
882
883 vm.register_async_builtin("mcp_server_info", |args| async move {
884 let client = match args.first() {
885 Some(VmValue::McpClient(c)) => c.clone(),
886 _ => {
887 return Err(VmError::Thrown(VmValue::String(Rc::from(
888 "mcp_server_info: argument must be an MCP client",
889 ))));
890 }
891 };
892
893 let guard = client.inner.lock().await;
894 if guard.is_none() {
895 return Err(VmError::Runtime("MCP client is disconnected".into()));
896 }
897 drop(guard);
898
899 let mut info = BTreeMap::new();
900 info.insert(
901 "name".to_string(),
902 VmValue::String(Rc::from(client.name.as_str())),
903 );
904 info.insert("connected".to_string(), VmValue::Bool(true));
905 Ok(VmValue::Dict(Rc::new(info)))
906 });
907
908 vm.register_async_builtin("mcp_disconnect", |args| async move {
909 let client = match args.first() {
910 Some(VmValue::McpClient(c)) => c.clone(),
911 _ => {
912 return Err(VmError::Thrown(VmValue::String(Rc::from(
913 "mcp_disconnect: argument must be an MCP client",
914 ))));
915 }
916 };
917
918 let mut guard = client.inner.lock().await;
919 if let Some(inner) = guard.take() {
920 match inner {
921 McpClientInner::Stdio(mut inner) => {
922 let _ = inner.child.kill().await;
923 }
924 McpClientInner::Http(_) => {}
925 }
926 }
927 Ok(VmValue::Nil)
928 });
929
930 vm.register_async_builtin("mcp_list_resources", |args| async move {
931 let client = match args.first() {
932 Some(VmValue::McpClient(c)) => c.clone(),
933 _ => {
934 return Err(VmError::Thrown(VmValue::String(Rc::from(
935 "mcp_list_resources: argument must be an MCP client",
936 ))));
937 }
938 };
939
940 let result = client.call("resources/list", serde_json::json!({})).await?;
941 let resources = result
942 .get("resources")
943 .and_then(|r| r.as_array())
944 .cloned()
945 .unwrap_or_default();
946
947 let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
948 Ok(VmValue::List(Rc::new(vm_resources)))
949 });
950
951 vm.register_async_builtin("mcp_read_resource", |args| async move {
952 let client = match args.first() {
953 Some(VmValue::McpClient(c)) => c.clone(),
954 _ => {
955 return Err(VmError::Thrown(VmValue::String(Rc::from(
956 "mcp_read_resource: first argument must be an MCP client",
957 ))));
958 }
959 };
960
961 let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
962 if uri.is_empty() {
963 return Err(VmError::Thrown(VmValue::String(Rc::from(
964 "mcp_read_resource: URI is required",
965 ))));
966 }
967
968 let result = client
969 .call("resources/read", serde_json::json!({ "uri": uri }))
970 .await?;
971
972 let contents = result
973 .get("contents")
974 .and_then(|c| c.as_array())
975 .cloned()
976 .unwrap_or_default();
977
978 if contents.len() == 1 {
979 if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
980 return Ok(VmValue::String(Rc::from(text)));
981 }
982 }
983
984 if contents.is_empty() {
985 Ok(VmValue::Nil)
986 } else {
987 Ok(VmValue::List(Rc::new(
988 contents.iter().map(json_to_vm_value).collect(),
989 )))
990 }
991 });
992
993 vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
994 let client = match args.first() {
995 Some(VmValue::McpClient(c)) => c.clone(),
996 _ => {
997 return Err(VmError::Thrown(VmValue::String(Rc::from(
998 "mcp_list_resource_templates: argument must be an MCP client",
999 ))));
1000 }
1001 };
1002
1003 let result = client
1004 .call("resources/templates/list", serde_json::json!({}))
1005 .await?;
1006
1007 let templates = result
1008 .get("resourceTemplates")
1009 .and_then(|r| r.as_array())
1010 .cloned()
1011 .unwrap_or_default();
1012
1013 let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
1014 Ok(VmValue::List(Rc::new(vm_templates)))
1015 });
1016
1017 vm.register_async_builtin("mcp_list_prompts", |args| async move {
1018 let client = match args.first() {
1019 Some(VmValue::McpClient(c)) => c.clone(),
1020 _ => {
1021 return Err(VmError::Thrown(VmValue::String(Rc::from(
1022 "mcp_list_prompts: argument must be an MCP client",
1023 ))));
1024 }
1025 };
1026
1027 let result = client.call("prompts/list", serde_json::json!({})).await?;
1028
1029 let prompts = result
1030 .get("prompts")
1031 .and_then(|p| p.as_array())
1032 .cloned()
1033 .unwrap_or_default();
1034
1035 let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
1036 Ok(VmValue::List(Rc::new(vm_prompts)))
1037 });
1038
1039 vm.register_async_builtin("mcp_get_prompt", |args| async move {
1040 let client = match args.first() {
1041 Some(VmValue::McpClient(c)) => c.clone(),
1042 _ => {
1043 return Err(VmError::Thrown(VmValue::String(Rc::from(
1044 "mcp_get_prompt: first argument must be an MCP client",
1045 ))));
1046 }
1047 };
1048
1049 let name = args.get(1).map(|a| a.display()).unwrap_or_default();
1050 if name.is_empty() {
1051 return Err(VmError::Thrown(VmValue::String(Rc::from(
1052 "mcp_get_prompt: prompt name is required",
1053 ))));
1054 }
1055
1056 let arguments = match args.get(2) {
1057 Some(VmValue::Dict(d)) => {
1058 let obj: serde_json::Map<String, serde_json::Value> = d
1059 .iter()
1060 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1061 .collect();
1062 serde_json::Value::Object(obj)
1063 }
1064 _ => serde_json::json!({}),
1065 };
1066
1067 let result = client
1068 .call(
1069 "prompts/get",
1070 serde_json::json!({
1071 "name": name,
1072 "arguments": arguments,
1073 }),
1074 )
1075 .await?;
1076
1077 Ok(json_to_vm_value(&result))
1078 });
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083 use super::*;
1084
1085 #[test]
1086 fn test_vm_value_to_serde_string() {
1087 let val = VmValue::String(Rc::from("hello"));
1088 let json = vm_value_to_serde(&val);
1089 assert_eq!(json, serde_json::json!("hello"));
1090 }
1091
1092 #[test]
1093 fn test_vm_value_to_serde_dict() {
1094 let mut map = BTreeMap::new();
1095 map.insert("key".to_string(), VmValue::Int(42));
1096 let val = VmValue::Dict(Rc::new(map));
1097 let json = vm_value_to_serde(&val);
1098 assert_eq!(json, serde_json::json!({"key": 42}));
1099 }
1100
1101 #[test]
1102 fn test_vm_value_to_serde_list() {
1103 let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
1104 let json = vm_value_to_serde(&val);
1105 assert_eq!(json, serde_json::json!([1, 2]));
1106 }
1107
1108 #[test]
1109 fn test_extract_content_text_single() {
1110 let result = serde_json::json!({
1111 "content": [{"type": "text", "text": "hello world"}],
1112 "isError": false
1113 });
1114 assert_eq!(extract_content_text(&result), "hello world");
1115 }
1116
1117 #[test]
1118 fn test_extract_content_text_multiple() {
1119 let result = serde_json::json!({
1120 "content": [
1121 {"type": "text", "text": "first"},
1122 {"type": "text", "text": "second"}
1123 ],
1124 "isError": false
1125 });
1126 assert_eq!(extract_content_text(&result), "first\nsecond");
1127 }
1128
1129 #[test]
1130 fn test_extract_content_text_fallback_json() {
1131 let result = serde_json::json!({
1132 "content": [{"type": "image", "data": "abc"}],
1133 "isError": false
1134 });
1135 let output = extract_content_text(&result);
1136 assert!(output.contains("image"));
1137 }
1138
1139 #[test]
1140 fn test_parse_sse_jsonrpc_body_uses_last_jsonrpc_message() {
1141 let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1142 let parsed = parse_sse_jsonrpc_body(body).unwrap();
1143 assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1144 }
1145}