Skip to main content

harn_vm/
mcp.rs

1//! MCP (Model Context Protocol) client for connecting to external tool servers.
2//!
3//! Supports stdio transport and streamable HTTP-style request/response transport.
4
5use std::collections::BTreeMap;
6use std::rc::Rc;
7use std::sync::Arc;
8
9use serde::Deserialize;
10use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
11use tokio::process::{Child, ChildStdin, ChildStdout};
12use tokio::sync::Mutex;
13
14use crate::stdlib::json_to_vm_value;
15use crate::value::{VmError, VmValue};
16use crate::vm::Vm;
17
18/// MCP protocol version we negotiate by default.
19const PROTOCOL_VERSION: &str = "2025-11-25";
20
21/// Default timeout for MCP requests (60 seconds).
22const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
23
24#[derive(Clone, Debug, Deserialize)]
25#[serde(rename_all = "lowercase")]
26enum McpTransport {
27    Stdio,
28    Http,
29}
30
31#[derive(Clone, Debug, Deserialize)]
32pub struct McpServerSpec {
33    pub name: String,
34    #[serde(default = "default_transport")]
35    transport: McpTransport,
36    #[serde(default)]
37    pub command: String,
38    #[serde(default)]
39    pub args: Vec<String>,
40    #[serde(default)]
41    pub env: BTreeMap<String, String>,
42    #[serde(default)]
43    pub url: String,
44    #[serde(default)]
45    pub auth_token: Option<String>,
46    #[serde(default)]
47    pub protocol_version: Option<String>,
48    #[serde(default)]
49    pub proxy_server_name: Option<String>,
50}
51
52fn default_transport() -> McpTransport {
53    McpTransport::Stdio
54}
55
56/// Internal state for an MCP client connection.
57enum McpClientInner {
58    Stdio(StdioMcpClientInner),
59    Http(HttpMcpClientInner),
60}
61
62struct StdioMcpClientInner {
63    child: Child,
64    stdin: ChildStdin,
65    reader: BufReader<ChildStdout>,
66    next_id: u64,
67}
68
69struct HttpMcpClientInner {
70    client: reqwest::Client,
71    url: String,
72    auth_token: Option<String>,
73    protocol_version: String,
74    session_id: Option<String>,
75    next_id: u64,
76    proxy_server_name: Option<String>,
77}
78
79/// Handle to an MCP client connection, stored in VmValue.
80#[derive(Clone)]
81pub struct VmMcpClientHandle {
82    pub name: String,
83    inner: Arc<Mutex<Option<McpClientInner>>>,
84}
85
86impl std::fmt::Debug for VmMcpClientHandle {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        write!(f, "McpClient({})", self.name)
89    }
90}
91
92impl VmMcpClientHandle {
93    async fn call(
94        &self,
95        method: &str,
96        params: serde_json::Value,
97    ) -> Result<serde_json::Value, VmError> {
98        let mut guard = self.inner.lock().await;
99        let inner = guard
100            .as_mut()
101            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
102
103        match inner {
104            McpClientInner::Stdio(inner) => stdio_call(inner, method, params).await,
105            McpClientInner::Http(inner) => http_call(inner, method, params).await,
106        }
107    }
108
109    async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
110        let mut guard = self.inner.lock().await;
111        let inner = guard
112            .as_mut()
113            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
114
115        match inner {
116            McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
117            McpClientInner::Http(inner) => http_notify(inner, method, params).await,
118        }
119    }
120}
121
122async fn stdio_call(
123    inner: &mut StdioMcpClientInner,
124    method: &str,
125    params: serde_json::Value,
126) -> Result<serde_json::Value, VmError> {
127    let id = inner.next_id;
128    inner.next_id += 1;
129
130    let request = serde_json::json!({
131        "jsonrpc": "2.0",
132        "id": id,
133        "method": method,
134        "params": params,
135    });
136
137    let line = serde_json::to_string(&request)
138        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
139    inner
140        .stdin
141        .write_all(line.as_bytes())
142        .await
143        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
144    inner
145        .stdin
146        .write_all(b"\n")
147        .await
148        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
149    inner
150        .stdin
151        .flush()
152        .await
153        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
154
155    let mut line_buf = String::new();
156    loop {
157        line_buf.clear();
158        let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
159            .await
160            .map_err(|_| {
161                VmError::Runtime(format!(
162                    "MCP: server did not respond to '{method}' within {}s",
163                    MCP_TIMEOUT.as_secs()
164                ))
165            })?
166            .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
167
168        if bytes_read == 0 {
169            return Err(VmError::Runtime("MCP: server closed connection".into()));
170        }
171
172        let trimmed = line_buf.trim();
173        if trimmed.is_empty() {
174            continue;
175        }
176
177        let msg: serde_json::Value = match serde_json::from_str(trimmed) {
178            Ok(v) => v,
179            Err(_) => continue,
180        };
181
182        if msg.get("id").is_none() {
183            continue;
184        }
185
186        if msg["id"].as_u64() == Some(id) {
187            return parse_jsonrpc_result(msg);
188        }
189    }
190}
191
192async fn stdio_notify(
193    inner: &mut StdioMcpClientInner,
194    method: &str,
195    params: serde_json::Value,
196) -> Result<(), VmError> {
197    let notification = serde_json::json!({
198        "jsonrpc": "2.0",
199        "method": method,
200        "params": params,
201    });
202
203    let line = serde_json::to_string(&notification)
204        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
205    inner
206        .stdin
207        .write_all(line.as_bytes())
208        .await
209        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
210    inner
211        .stdin
212        .write_all(b"\n")
213        .await
214        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
215    inner
216        .stdin
217        .flush()
218        .await
219        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
220    Ok(())
221}
222
223async fn http_call(
224    inner: &mut HttpMcpClientInner,
225    method: &str,
226    params: serde_json::Value,
227) -> Result<serde_json::Value, VmError> {
228    let id = inner.next_id;
229    inner.next_id += 1;
230    send_http_request(inner, method, params, Some(id)).await
231}
232
233async fn http_notify(
234    inner: &mut HttpMcpClientInner,
235    method: &str,
236    params: serde_json::Value,
237) -> Result<(), VmError> {
238    let _ = send_http_request(inner, method, params, None).await?;
239    Ok(())
240}
241
242async fn send_http_request(
243    inner: &mut HttpMcpClientInner,
244    method: &str,
245    params: serde_json::Value,
246    id: Option<u64>,
247) -> Result<serde_json::Value, VmError> {
248    for attempt in 0..2 {
249        let response = send_http_request_once(inner, method, params.clone(), id).await?;
250
251        let status = response.status().as_u16();
252        let headers = response.headers().clone();
253        if let Some(protocol_version) = headers
254            .get("MCP-Protocol-Version")
255            .and_then(|v| v.to_str().ok())
256        {
257            inner.protocol_version = protocol_version.to_string();
258        }
259        if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
260            inner.session_id = Some(session_id.to_string());
261        }
262
263        if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
264            inner.session_id = None;
265            reinitialize_http_client(inner).await?;
266            continue;
267        }
268
269        if status == 401 {
270            return Err(VmError::Thrown(VmValue::String(Rc::from(
271                "MCP authorization required",
272            ))));
273        }
274
275        let body = response
276            .text()
277            .await
278            .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
279
280        if body.trim().is_empty() {
281            return Ok(serde_json::Value::Null);
282        }
283
284        let msg = parse_http_response_body(&body, status)?;
285
286        if status >= 400 {
287            return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
288        }
289
290        if id.is_none() {
291            return Ok(msg);
292        }
293        return parse_jsonrpc_result(msg);
294    }
295
296    Err(VmError::Runtime("MCP HTTP request failed".into()))
297}
298
299async fn send_http_request_once(
300    inner: &mut HttpMcpClientInner,
301    method: &str,
302    params: serde_json::Value,
303    id: Option<u64>,
304) -> Result<reqwest::Response, VmError> {
305    let payload = if let Some(proxy_server_name) = &inner.proxy_server_name {
306        let mut body = serde_json::json!({
307            "serverName": proxy_server_name,
308            "jsonrpc": "2.0",
309            "method": method,
310            "params": params,
311        });
312        if let Some(id) = id {
313            body["id"] = serde_json::json!(id);
314        }
315        body
316    } else {
317        let mut body = serde_json::json!({
318            "jsonrpc": "2.0",
319            "method": method,
320            "params": params,
321        });
322        if let Some(id) = id {
323            body["id"] = serde_json::json!(id);
324        }
325        body
326    };
327
328    let mut request = inner
329        .client
330        .post(&inner.url)
331        .header("Content-Type", "application/json")
332        .header("Accept", "application/json, text/event-stream")
333        .header("MCP-Protocol-Version", &inner.protocol_version)
334        .json(&payload);
335
336    if let Some(token) = &inner.auth_token {
337        request = request.header("Authorization", format!("Bearer {token}"));
338    }
339    if let Some(session_id) = &inner.session_id {
340        request = request.header("MCP-Session-Id", session_id);
341    }
342
343    request
344        .send()
345        .await
346        .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
347}
348
349async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
350    let initialize = send_http_request_once(
351        inner,
352        "initialize",
353        serde_json::json!({
354            "protocolVersion": PROTOCOL_VERSION,
355            "capabilities": {},
356            "clientInfo": {
357                "name": "harn",
358                "version": env!("CARGO_PKG_VERSION"),
359            }
360        }),
361        Some(0),
362    )
363    .await?;
364    if let Some(protocol_version) = initialize
365        .headers()
366        .get("MCP-Protocol-Version")
367        .and_then(|v| v.to_str().ok())
368    {
369        inner.protocol_version = protocol_version.to_string();
370    }
371    if let Some(session_id) = initialize
372        .headers()
373        .get("MCP-Session-Id")
374        .and_then(|v| v.to_str().ok())
375    {
376        inner.session_id = Some(session_id.to_string());
377    }
378    let status = initialize.status().as_u16();
379    let body = initialize
380        .text()
381        .await
382        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
383    let msg = parse_http_response_body(&body, status)?;
384    if status >= 400 {
385        return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
386    }
387    let _ = parse_jsonrpc_result(msg)?;
388    let response = send_http_request_once(
389        inner,
390        "notifications/initialized",
391        serde_json::json!({}),
392        None,
393    )
394    .await?;
395    let status = response.status().as_u16();
396    if let Some(protocol_version) = response
397        .headers()
398        .get("MCP-Protocol-Version")
399        .and_then(|v| v.to_str().ok())
400    {
401        inner.protocol_version = protocol_version.to_string();
402    }
403    if let Some(session_id) = response
404        .headers()
405        .get("MCP-Session-Id")
406        .and_then(|v| v.to_str().ok())
407    {
408        inner.session_id = Some(session_id.to_string());
409    }
410    let body = response
411        .text()
412        .await
413        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
414    if body.trim().is_empty() || status < 400 {
415        return Ok(());
416    }
417    let msg = parse_http_response_body(&body, status)?;
418    Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
419}
420
421fn parse_http_response_body(body: &str, status: u16) -> Result<serde_json::Value, VmError> {
422    if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
423        return parse_sse_jsonrpc_body(body);
424    }
425    serde_json::from_str(body).map_err(|e| {
426        VmError::Runtime(format!(
427            "MCP HTTP response parse error (status {status}): {e}"
428        ))
429    })
430}
431
432fn parse_sse_jsonrpc_body(body: &str) -> Result<serde_json::Value, VmError> {
433    let mut current_data = Vec::new();
434    let mut messages = Vec::new();
435
436    for line in body.lines() {
437        if line.is_empty() {
438            if !current_data.is_empty() {
439                messages.push(current_data.join("\n"));
440                current_data.clear();
441            }
442            continue;
443        }
444        if let Some(data) = line.strip_prefix("data:") {
445            current_data.push(data.trim_start().to_string());
446        }
447    }
448    if !current_data.is_empty() {
449        messages.push(current_data.join("\n"));
450    }
451
452    for message in messages.into_iter().rev() {
453        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
454            if value.get("result").is_some()
455                || value.get("error").is_some()
456                || value.get("method").is_some()
457            {
458                return Ok(value);
459            }
460        }
461    }
462
463    Err(VmError::Runtime(
464        "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
465    ))
466}
467
468fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
469    if let Some(error) = msg.get("error") {
470        return Err(jsonrpc_error_to_vm_error(error));
471    }
472    Ok(msg
473        .get("result")
474        .cloned()
475        .unwrap_or(serde_json::Value::Null))
476}
477
478fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
479    let message = error
480        .get("message")
481        .and_then(|v| v.as_str())
482        .unwrap_or("Unknown MCP error");
483    let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
484    VmError::Thrown(VmValue::String(Rc::from(format!(
485        "MCP error ({code}): {message}"
486    ))))
487}
488
489async fn mcp_connect_stdio_impl(
490    command: &str,
491    args: &[String],
492    env: &BTreeMap<String, String>,
493) -> Result<VmMcpClientHandle, VmError> {
494    let mut cmd = tokio::process::Command::new(command);
495    cmd.args(args)
496        .stdin(std::process::Stdio::piped())
497        .stdout(std::process::Stdio::piped())
498        .stderr(std::process::Stdio::inherit())
499        .envs(env);
500
501    let mut child = cmd.spawn().map_err(|e| {
502        VmError::Thrown(VmValue::String(Rc::from(format!(
503            "mcp_connect: failed to spawn '{command}': {e}"
504        ))))
505    })?;
506
507    let stdin = child
508        .stdin
509        .take()
510        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
511    let stdout = child
512        .stdout
513        .take()
514        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
515
516    let handle = VmMcpClientHandle {
517        name: command.to_string(),
518        inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
519            StdioMcpClientInner {
520                child,
521                stdin,
522                reader: BufReader::new(stdout),
523                next_id: 1,
524            },
525        )))),
526    };
527
528    initialize_client(&handle).await?;
529    Ok(handle)
530}
531
532async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
533    let client = reqwest::Client::builder()
534        .timeout(MCP_TIMEOUT)
535        .build()
536        .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
537
538    let handle = VmMcpClientHandle {
539        name: spec.name.clone(),
540        inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
541            client,
542            url: spec.url.clone(),
543            auth_token: spec.auth_token.clone(),
544            protocol_version: spec
545                .protocol_version
546                .clone()
547                .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
548            session_id: None,
549            next_id: 1,
550            proxy_server_name: spec.proxy_server_name.clone(),
551        })))),
552    };
553
554    initialize_client(&handle).await?;
555    Ok(handle)
556}
557
558async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
559    handle
560        .call(
561            "initialize",
562            serde_json::json!({
563                "protocolVersion": PROTOCOL_VERSION,
564                "capabilities": {},
565                "clientInfo": {
566                    "name": "harn",
567                    "version": env!("CARGO_PKG_VERSION"),
568                }
569            }),
570        )
571        .await?;
572
573    handle
574        .notify("notifications/initialized", serde_json::json!({}))
575        .await?;
576
577    Ok(())
578}
579
580pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
581    match val {
582        VmValue::String(s) => serde_json::Value::String(s.to_string()),
583        VmValue::Int(n) => serde_json::json!(*n),
584        VmValue::Float(n) => serde_json::json!(*n),
585        VmValue::Bool(b) => serde_json::Value::Bool(*b),
586        VmValue::Nil => serde_json::Value::Null,
587        VmValue::List(items) => {
588            serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
589        }
590        VmValue::Dict(map) => {
591            let obj: serde_json::Map<String, serde_json::Value> = map
592                .iter()
593                .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
594                .collect();
595            serde_json::Value::Object(obj)
596        }
597        _ => serde_json::Value::Null,
598    }
599}
600
601fn extract_content_text(result: &serde_json::Value) -> String {
602    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
603        let texts: Vec<&str> = content
604            .iter()
605            .filter_map(|item| {
606                if item.get("type").and_then(|t| t.as_str()) == Some("text") {
607                    item.get("text").and_then(|t| t.as_str())
608                } else {
609                    None
610                }
611            })
612            .collect();
613        if texts.is_empty() {
614            json_to_vm_value(result).display()
615        } else {
616            texts.join("\n")
617        }
618    } else {
619        json_to_vm_value(result).display()
620    }
621}
622
623pub async fn connect_mcp_server(
624    name: &str,
625    command: &str,
626    args: &[String],
627) -> Result<VmMcpClientHandle, VmError> {
628    let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
629    handle.name = name.to_string();
630    Ok(handle)
631}
632
633pub async fn connect_mcp_server_from_spec(
634    spec: &McpServerSpec,
635) -> Result<VmMcpClientHandle, VmError> {
636    let mut handle = match spec.transport {
637        McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
638        McpTransport::Http => mcp_connect_http_impl(spec).await?,
639    };
640    handle.name = spec.name.clone();
641    Ok(handle)
642}
643
644pub async fn connect_mcp_server_from_json(
645    value: &serde_json::Value,
646) -> Result<VmMcpClientHandle, VmError> {
647    let spec: McpServerSpec = serde_json::from_value(value.clone())
648        .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
649    connect_mcp_server_from_spec(&spec).await
650}
651
652pub fn register_mcp_builtins(vm: &mut Vm) {
653    vm.register_async_builtin("mcp_connect", |args| async move {
654        let command = args.first().map(|a| a.display()).unwrap_or_default();
655        if command.is_empty() {
656            return Err(VmError::Thrown(VmValue::String(Rc::from(
657                "mcp_connect: command is required",
658            ))));
659        }
660
661        let cmd_args: Vec<String> = match args.get(1) {
662            Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
663            _ => Vec::new(),
664        };
665
666        let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
667        Ok(VmValue::McpClient(handle))
668    });
669
670    // Lazy registry: ensure a registered server is booted and return its
671    // live client handle. Used by skill activation (`requires_mcp`) and
672    // by user code that wants to trigger a lazy connect explicitly.
673    vm.register_async_builtin("mcp_ensure_active", |args| async move {
674        let name = match args.first() {
675            Some(VmValue::String(s)) => s.to_string(),
676            Some(other) => other.display(),
677            None => String::new(),
678        };
679        if name.is_empty() {
680            return Err(VmError::Thrown(VmValue::String(Rc::from(
681                "mcp_ensure_active: server name is required",
682            ))));
683        }
684        let handle = crate::mcp_registry::ensure_active(&name).await?;
685        Ok(VmValue::McpClient(handle))
686    });
687
688    // Decrement the binder refcount for a registered server. Called by
689    // skill deactivation paths and by user code that manually bound via
690    // `mcp_ensure_active`. No-op when the name isn't registered.
691    vm.register_builtin("mcp_release", |args, _out| {
692        let name = match args.first() {
693            Some(VmValue::String(s)) => s.to_string(),
694            Some(other) => other.display(),
695            None => {
696                return Err(VmError::Thrown(VmValue::String(Rc::from(
697                    "mcp_release: server name is required",
698                ))));
699            }
700        };
701        crate::mcp_registry::release(&name);
702        Ok(VmValue::Nil)
703    });
704
705    // Return the declared MCP servers and their current state as a list
706    // of dicts. Purely diagnostic — useful for `harn` scripts that want
707    // to show connection state in a status-line or dashboard.
708    vm.register_builtin("mcp_registry_status", |_args, _out| {
709        let mut out = Vec::new();
710        for entry in crate::mcp_registry::snapshot_status() {
711            let mut dict = BTreeMap::new();
712            dict.insert(
713                "name".to_string(),
714                VmValue::String(Rc::from(entry.name.as_str())),
715            );
716            dict.insert("lazy".to_string(), VmValue::Bool(entry.lazy));
717            dict.insert("active".to_string(), VmValue::Bool(entry.active));
718            dict.insert(
719                "ref_count".to_string(),
720                VmValue::Int(entry.ref_count as i64),
721            );
722            if let Some(card) = entry.card {
723                dict.insert("card".to_string(), VmValue::String(Rc::from(card.as_str())));
724            }
725            out.push(VmValue::Dict(Rc::new(dict)));
726        }
727        Ok(VmValue::List(Rc::new(out)))
728    });
729
730    // Fetch (or read from cache) the Server Card for a registered MCP
731    // server, or from an explicit URL / local path.
732    //
733    // `mcp_server_card("notion")`           -> looks up `card = ...` in harn.toml
734    // `mcp_server_card("https://.../card")` -> fetches that URL directly
735    // `mcp_server_card("./card.json")`      -> reads that file directly
736    vm.register_async_builtin("mcp_server_card", |args| async move {
737        let target = match args.first() {
738            Some(VmValue::String(s)) => s.to_string(),
739            Some(other) => other.display(),
740            None => {
741                return Err(VmError::Thrown(VmValue::String(Rc::from(
742                    "mcp_server_card: server name, URL, or path is required",
743                ))));
744            }
745        };
746
747        // Source resolution: if the arg looks like a URL or path
748        // (contains '/', '\\', or starts with a scheme), use it as-is.
749        // Otherwise treat it as a registered server name and look up
750        // its `card` field. This matches the user model: "I already
751        // wrote down where the card lives in harn.toml — just use it."
752        let source = if target.starts_with("http://")
753            || target.starts_with("https://")
754            || target.contains('/')
755            || target.contains('\\')
756            || target.ends_with(".json")
757        {
758            target.clone()
759        } else {
760            match crate::mcp_registry::get_registration(&target) {
761                Some(reg) => match reg.card {
762                    Some(card) => card,
763                    None => {
764                        return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
765                            "mcp_server_card: server '{target}' has no 'card' field in harn.toml"
766                        )))));
767                    }
768                },
769                None => {
770                    return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
771                        "mcp_server_card: no MCP server '{target}' registered (check harn.toml) \
772                         — pass a URL or path directly instead"
773                    )))));
774                }
775            }
776        };
777
778        let card = crate::mcp_card::fetch_server_card(&source, None)
779            .await
780            .map_err(|e| {
781                VmError::Thrown(VmValue::String(Rc::from(format!("mcp_server_card: {e}"))))
782            })?;
783        Ok(json_to_vm_value(&card))
784    });
785
786    vm.register_async_builtin("mcp_list_tools", |args| async move {
787        let client = match args.first() {
788            Some(VmValue::McpClient(c)) => c.clone(),
789            _ => {
790                return Err(VmError::Thrown(VmValue::String(Rc::from(
791                    "mcp_list_tools: argument must be an MCP client",
792                ))));
793            }
794        };
795
796        let result = client.call("tools/list", serde_json::json!({})).await?;
797        let mut tools = result
798            .get("tools")
799            .and_then(|t| t.as_array())
800            .cloned()
801            .unwrap_or_default();
802
803        // Tag every tool with its originating server name so
804        // downstream indexers (tool_search BM25) can surface them
805        // under queries like "github" or "mcp:github". Harmless to
806        // non-indexing callers — just an extra dict key.
807        let server_name = client.name.clone();
808        for tool in tools.iter_mut() {
809            if let Some(obj) = tool.as_object_mut() {
810                obj.entry("_mcp_server")
811                    .or_insert_with(|| serde_json::Value::String(server_name.clone()));
812            }
813        }
814
815        let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
816        Ok(VmValue::List(Rc::new(vm_tools)))
817    });
818
819    vm.register_async_builtin("mcp_call", |args| async move {
820        let client = match args.first() {
821            Some(VmValue::McpClient(c)) => c.clone(),
822            _ => {
823                return Err(VmError::Thrown(VmValue::String(Rc::from(
824                    "mcp_call: first argument must be an MCP client",
825                ))));
826            }
827        };
828
829        let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
830        if tool_name.is_empty() {
831            return Err(VmError::Thrown(VmValue::String(Rc::from(
832                "mcp_call: tool name is required",
833            ))));
834        }
835
836        let arguments = match args.get(2) {
837            Some(VmValue::Dict(d)) => {
838                let obj: serde_json::Map<String, serde_json::Value> = d
839                    .iter()
840                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
841                    .collect();
842                serde_json::Value::Object(obj)
843            }
844            _ => serde_json::json!({}),
845        };
846
847        let result = client
848            .call(
849                "tools/call",
850                serde_json::json!({
851                    "name": tool_name,
852                    "arguments": arguments,
853                }),
854            )
855            .await?;
856
857        if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
858            let error_text = extract_content_text(&result);
859            return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
860        }
861
862        let content = result
863            .get("content")
864            .and_then(|c| c.as_array())
865            .cloned()
866            .unwrap_or_default();
867
868        if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
869            if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
870                return Ok(VmValue::String(Rc::from(text)));
871            }
872        }
873
874        if content.is_empty() {
875            Ok(VmValue::Nil)
876        } else {
877            Ok(VmValue::List(Rc::new(
878                content.iter().map(json_to_vm_value).collect(),
879            )))
880        }
881    });
882
883    vm.register_async_builtin("mcp_server_info", |args| async move {
884        let client = match args.first() {
885            Some(VmValue::McpClient(c)) => c.clone(),
886            _ => {
887                return Err(VmError::Thrown(VmValue::String(Rc::from(
888                    "mcp_server_info: argument must be an MCP client",
889                ))));
890            }
891        };
892
893        let guard = client.inner.lock().await;
894        if guard.is_none() {
895            return Err(VmError::Runtime("MCP client is disconnected".into()));
896        }
897        drop(guard);
898
899        let mut info = BTreeMap::new();
900        info.insert(
901            "name".to_string(),
902            VmValue::String(Rc::from(client.name.as_str())),
903        );
904        info.insert("connected".to_string(), VmValue::Bool(true));
905        Ok(VmValue::Dict(Rc::new(info)))
906    });
907
908    vm.register_async_builtin("mcp_disconnect", |args| async move {
909        let client = match args.first() {
910            Some(VmValue::McpClient(c)) => c.clone(),
911            _ => {
912                return Err(VmError::Thrown(VmValue::String(Rc::from(
913                    "mcp_disconnect: argument must be an MCP client",
914                ))));
915            }
916        };
917
918        let mut guard = client.inner.lock().await;
919        if let Some(inner) = guard.take() {
920            match inner {
921                McpClientInner::Stdio(mut inner) => {
922                    let _ = inner.child.kill().await;
923                }
924                McpClientInner::Http(_) => {}
925            }
926        }
927        Ok(VmValue::Nil)
928    });
929
930    vm.register_async_builtin("mcp_list_resources", |args| async move {
931        let client = match args.first() {
932            Some(VmValue::McpClient(c)) => c.clone(),
933            _ => {
934                return Err(VmError::Thrown(VmValue::String(Rc::from(
935                    "mcp_list_resources: argument must be an MCP client",
936                ))));
937            }
938        };
939
940        let result = client.call("resources/list", serde_json::json!({})).await?;
941        let resources = result
942            .get("resources")
943            .and_then(|r| r.as_array())
944            .cloned()
945            .unwrap_or_default();
946
947        let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
948        Ok(VmValue::List(Rc::new(vm_resources)))
949    });
950
951    vm.register_async_builtin("mcp_read_resource", |args| async move {
952        let client = match args.first() {
953            Some(VmValue::McpClient(c)) => c.clone(),
954            _ => {
955                return Err(VmError::Thrown(VmValue::String(Rc::from(
956                    "mcp_read_resource: first argument must be an MCP client",
957                ))));
958            }
959        };
960
961        let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
962        if uri.is_empty() {
963            return Err(VmError::Thrown(VmValue::String(Rc::from(
964                "mcp_read_resource: URI is required",
965            ))));
966        }
967
968        let result = client
969            .call("resources/read", serde_json::json!({ "uri": uri }))
970            .await?;
971
972        let contents = result
973            .get("contents")
974            .and_then(|c| c.as_array())
975            .cloned()
976            .unwrap_or_default();
977
978        if contents.len() == 1 {
979            if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
980                return Ok(VmValue::String(Rc::from(text)));
981            }
982        }
983
984        if contents.is_empty() {
985            Ok(VmValue::Nil)
986        } else {
987            Ok(VmValue::List(Rc::new(
988                contents.iter().map(json_to_vm_value).collect(),
989            )))
990        }
991    });
992
993    vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
994        let client = match args.first() {
995            Some(VmValue::McpClient(c)) => c.clone(),
996            _ => {
997                return Err(VmError::Thrown(VmValue::String(Rc::from(
998                    "mcp_list_resource_templates: argument must be an MCP client",
999                ))));
1000            }
1001        };
1002
1003        let result = client
1004            .call("resources/templates/list", serde_json::json!({}))
1005            .await?;
1006
1007        let templates = result
1008            .get("resourceTemplates")
1009            .and_then(|r| r.as_array())
1010            .cloned()
1011            .unwrap_or_default();
1012
1013        let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
1014        Ok(VmValue::List(Rc::new(vm_templates)))
1015    });
1016
1017    vm.register_async_builtin("mcp_list_prompts", |args| async move {
1018        let client = match args.first() {
1019            Some(VmValue::McpClient(c)) => c.clone(),
1020            _ => {
1021                return Err(VmError::Thrown(VmValue::String(Rc::from(
1022                    "mcp_list_prompts: argument must be an MCP client",
1023                ))));
1024            }
1025        };
1026
1027        let result = client.call("prompts/list", serde_json::json!({})).await?;
1028
1029        let prompts = result
1030            .get("prompts")
1031            .and_then(|p| p.as_array())
1032            .cloned()
1033            .unwrap_or_default();
1034
1035        let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
1036        Ok(VmValue::List(Rc::new(vm_prompts)))
1037    });
1038
1039    vm.register_async_builtin("mcp_get_prompt", |args| async move {
1040        let client = match args.first() {
1041            Some(VmValue::McpClient(c)) => c.clone(),
1042            _ => {
1043                return Err(VmError::Thrown(VmValue::String(Rc::from(
1044                    "mcp_get_prompt: first argument must be an MCP client",
1045                ))));
1046            }
1047        };
1048
1049        let name = args.get(1).map(|a| a.display()).unwrap_or_default();
1050        if name.is_empty() {
1051            return Err(VmError::Thrown(VmValue::String(Rc::from(
1052                "mcp_get_prompt: prompt name is required",
1053            ))));
1054        }
1055
1056        let arguments = match args.get(2) {
1057            Some(VmValue::Dict(d)) => {
1058                let obj: serde_json::Map<String, serde_json::Value> = d
1059                    .iter()
1060                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1061                    .collect();
1062                serde_json::Value::Object(obj)
1063            }
1064            _ => serde_json::json!({}),
1065        };
1066
1067        let result = client
1068            .call(
1069                "prompts/get",
1070                serde_json::json!({
1071                    "name": name,
1072                    "arguments": arguments,
1073                }),
1074            )
1075            .await?;
1076
1077        Ok(json_to_vm_value(&result))
1078    });
1079}
1080
1081#[cfg(test)]
1082mod tests {
1083    use super::*;
1084
1085    #[test]
1086    fn test_vm_value_to_serde_string() {
1087        let val = VmValue::String(Rc::from("hello"));
1088        let json = vm_value_to_serde(&val);
1089        assert_eq!(json, serde_json::json!("hello"));
1090    }
1091
1092    #[test]
1093    fn test_vm_value_to_serde_dict() {
1094        let mut map = BTreeMap::new();
1095        map.insert("key".to_string(), VmValue::Int(42));
1096        let val = VmValue::Dict(Rc::new(map));
1097        let json = vm_value_to_serde(&val);
1098        assert_eq!(json, serde_json::json!({"key": 42}));
1099    }
1100
1101    #[test]
1102    fn test_vm_value_to_serde_list() {
1103        let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
1104        let json = vm_value_to_serde(&val);
1105        assert_eq!(json, serde_json::json!([1, 2]));
1106    }
1107
1108    #[test]
1109    fn test_extract_content_text_single() {
1110        let result = serde_json::json!({
1111            "content": [{"type": "text", "text": "hello world"}],
1112            "isError": false
1113        });
1114        assert_eq!(extract_content_text(&result), "hello world");
1115    }
1116
1117    #[test]
1118    fn test_extract_content_text_multiple() {
1119        let result = serde_json::json!({
1120            "content": [
1121                {"type": "text", "text": "first"},
1122                {"type": "text", "text": "second"}
1123            ],
1124            "isError": false
1125        });
1126        assert_eq!(extract_content_text(&result), "first\nsecond");
1127    }
1128
1129    #[test]
1130    fn test_extract_content_text_fallback_json() {
1131        let result = serde_json::json!({
1132            "content": [{"type": "image", "data": "abc"}],
1133            "isError": false
1134        });
1135        let output = extract_content_text(&result);
1136        assert!(output.contains("image"));
1137    }
1138
1139    #[test]
1140    fn test_parse_sse_jsonrpc_body_uses_last_jsonrpc_message() {
1141        let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1142        let parsed = parse_sse_jsonrpc_body(body).unwrap();
1143        assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1144    }
1145}