Skip to main content

harn_vm/
mcp.rs

1//! MCP (Model Context Protocol) client for connecting to external tool servers.
2//!
3//! Supports stdio transport and streamable HTTP-style request/response transport.
4
5use std::collections::BTreeMap;
6use std::path::{Path, PathBuf};
7use std::rc::Rc;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use reqwest_eventsource::{Event as SseEvent, EventSource};
12use serde::Deserialize;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::{Child, ChildStdin, ChildStdout};
15use tokio::sync::Mutex;
16
17use crate::stdlib::json_to_vm_value;
18use crate::value::{VmError, VmValue};
19use crate::vm::Vm;
20
21/// MCP protocol version we negotiate by default.
22const PROTOCOL_VERSION: &str = "2025-11-25";
23
24/// Default timeout for MCP requests (60 seconds).
25const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
26
27#[derive(Clone, Debug, Deserialize)]
28#[serde(rename_all = "lowercase")]
29enum McpTransport {
30    Stdio,
31    Http,
32}
33
34#[derive(Clone, Debug, Deserialize)]
35pub struct McpServerSpec {
36    pub name: String,
37    #[serde(default = "default_transport")]
38    transport: McpTransport,
39    #[serde(default)]
40    pub command: String,
41    #[serde(default)]
42    pub args: Vec<String>,
43    #[serde(default)]
44    pub env: BTreeMap<String, String>,
45    #[serde(default)]
46    pub url: String,
47    #[serde(default)]
48    pub auth_token: Option<String>,
49    #[serde(default)]
50    pub protocol_version: Option<String>,
51    #[serde(default)]
52    pub proxy_server_name: Option<String>,
53}
54
55fn default_transport() -> McpTransport {
56    McpTransport::Stdio
57}
58
59/// Internal state for an MCP client connection.
60enum McpClientInner {
61    Stdio(StdioMcpClientInner),
62    Http(HttpMcpClientInner),
63}
64
65struct StdioMcpClientInner {
66    child: Child,
67    stdin: ChildStdin,
68    reader: BufReader<ChildStdout>,
69    next_id: u64,
70}
71
72struct HttpMcpClientInner {
73    client: reqwest::Client,
74    url: String,
75    auth_token: Option<String>,
76    protocol_version: String,
77    session_id: Option<String>,
78    next_id: u64,
79    proxy_server_name: Option<String>,
80    get_stream_task: Option<tokio::task::JoinHandle<()>>,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct McpRoot {
85    path: String,
86    uri: String,
87    name: String,
88}
89
90impl McpRoot {
91    fn protocol_json(&self) -> serde_json::Value {
92        serde_json::json!({
93            "uri": self.uri,
94            "name": self.name,
95        })
96    }
97
98    fn script_json(&self) -> serde_json::Value {
99        serde_json::json!({
100            "uri": self.uri,
101            "name": self.name,
102            "path": self.path,
103        })
104    }
105}
106
107impl HttpMcpClientInner {
108    fn abort_get_stream(&mut self) {
109        if let Some(task) = self.get_stream_task.take() {
110            task.abort();
111        }
112    }
113}
114
115/// Handle to an MCP client connection, stored in VmValue.
116#[derive(Clone)]
117pub struct VmMcpClientHandle {
118    pub name: String,
119    inner: Arc<Mutex<Option<McpClientInner>>>,
120    last_roots: Arc<Mutex<Vec<McpRoot>>>,
121    pub(crate) initialize_result: Arc<Mutex<Option<serde_json::Value>>>,
122}
123
124impl std::fmt::Debug for VmMcpClientHandle {
125    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126        write!(f, "McpClient({})", self.name)
127    }
128}
129
130impl VmMcpClientHandle {
131    pub(crate) async fn call(
132        &self,
133        method: &str,
134        params: serde_json::Value,
135    ) -> Result<serde_json::Value, VmError> {
136        if method != "initialize" {
137            self.notify_roots_list_changed_if_needed().await?;
138        }
139        let mut guard = self.inner.lock().await;
140        let inner = guard
141            .as_mut()
142            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
143
144        match inner {
145            McpClientInner::Stdio(inner) => stdio_call(inner, &self.name, method, params).await,
146            McpClientInner::Http(inner) => http_call(inner, &self.name, method, params).await,
147        }
148    }
149
150    async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
151        let mut guard = self.inner.lock().await;
152        let inner = guard
153            .as_mut()
154            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
155
156        match inner {
157            McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
158            McpClientInner::Http(inner) => http_notify(inner, &self.name, method, params).await,
159        }
160    }
161
162    pub(crate) async fn disconnect(&self) -> Result<(), VmError> {
163        let mut guard = self.inner.lock().await;
164        if let Some(inner) = guard.take() {
165            match inner {
166                McpClientInner::Stdio(mut inner) => {
167                    let _ = inner.child.kill().await;
168                }
169                McpClientInner::Http(mut inner) => {
170                    inner.abort_get_stream();
171                }
172            }
173        }
174        Ok(())
175    }
176
177    async fn notify_roots_list_changed_if_needed(&self) -> Result<(), VmError> {
178        let roots = current_mcp_roots();
179        let mut last_roots = self.last_roots.lock().await;
180        if *last_roots == roots {
181            return Ok(());
182        }
183
184        self.notify(
185            crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION,
186            serde_json::json!({}),
187        )
188        .await?;
189        *last_roots = roots;
190        Ok(())
191    }
192}
193
194async fn stdio_call(
195    inner: &mut StdioMcpClientInner,
196    server_name: &str,
197    method: &str,
198    params: serde_json::Value,
199) -> Result<serde_json::Value, VmError> {
200    let id = inner.next_id;
201    inner.next_id += 1;
202
203    let request = serde_json::json!({
204        "jsonrpc": "2.0",
205        "id": id,
206        "method": method,
207        "params": params,
208    });
209
210    let line = serde_json::to_string(&request)
211        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
212    inner
213        .stdin
214        .write_all(line.as_bytes())
215        .await
216        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
217    inner
218        .stdin
219        .write_all(b"\n")
220        .await
221        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
222    inner
223        .stdin
224        .flush()
225        .await
226        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
227
228    let mut line_buf = String::new();
229    loop {
230        line_buf.clear();
231        let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
232            .await
233            .map_err(|_| {
234                VmError::Runtime(format!(
235                    "MCP: server did not respond to '{method}' within {}s",
236                    MCP_TIMEOUT.as_secs()
237                ))
238            })?
239            .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
240
241        if bytes_read == 0 {
242            return Err(VmError::Runtime("MCP: server closed connection".into()));
243        }
244
245        let trimmed = line_buf.trim();
246        if trimmed.is_empty() {
247            continue;
248        }
249
250        let msg: serde_json::Value = match serde_json::from_str(trimmed) {
251            Ok(v) => v,
252            Err(_) => continue,
253        };
254
255        if msg.get("id").is_none() {
256            continue;
257        }
258
259        if msg["id"].as_u64() == Some(id)
260            && (msg.get("result").is_some() || msg.get("error").is_some())
261        {
262            return parse_jsonrpc_result(msg);
263        }
264
265        let response = match handle_inbound_client_request(server_name, &msg).await {
266            Some(response) => response,
267            None => continue,
268        };
269        let line = serde_json::to_string(&response)
270            .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
271        inner
272            .stdin
273            .write_all(line.as_bytes())
274            .await
275            .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
276        inner
277            .stdin
278            .write_all(b"\n")
279            .await
280            .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
281        inner
282            .stdin
283            .flush()
284            .await
285            .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
286    }
287}
288
289/// Handle a server-to-client request that arrived on the stream while
290/// we were waiting for a response. Returns `Some(response)` to send
291/// back to the server, or `None` if the message wasn't actually a
292/// request (e.g. an unknown notification we should ignore).
293async fn handle_inbound_client_request(
294    server_name: &str,
295    msg: &serde_json::Value,
296) -> Option<serde_json::Value> {
297    let method = msg.get("method").and_then(|value| value.as_str())?;
298    if method == crate::mcp_elicit::ELICITATION_METHOD {
299        return Some(crate::mcp_elicit::dispatch_inbound_elicitation(server_name, msg).await);
300    }
301    if method == crate::mcp_sampling::SAMPLING_METHOD {
302        return Some(crate::mcp_sampling::dispatch_inbound_sampling(server_name, msg).await);
303    }
304    if method == crate::mcp_protocol::METHOD_ROOTS_LIST {
305        let id = msg.get("id")?.clone();
306        return Some(harn_roots_list_response(id));
307    }
308    client_request_rejection(msg)
309}
310
311async fn stdio_notify(
312    inner: &mut StdioMcpClientInner,
313    method: &str,
314    params: serde_json::Value,
315) -> Result<(), VmError> {
316    let notification = serde_json::json!({
317        "jsonrpc": "2.0",
318        "method": method,
319        "params": params,
320    });
321
322    let line = serde_json::to_string(&notification)
323        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
324    inner
325        .stdin
326        .write_all(line.as_bytes())
327        .await
328        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
329    inner
330        .stdin
331        .write_all(b"\n")
332        .await
333        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
334    inner
335        .stdin
336        .flush()
337        .await
338        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
339    Ok(())
340}
341
342async fn http_call(
343    inner: &mut HttpMcpClientInner,
344    server_name: &str,
345    method: &str,
346    params: serde_json::Value,
347) -> Result<serde_json::Value, VmError> {
348    let id = inner.next_id;
349    inner.next_id += 1;
350    send_http_request(inner, server_name, method, params, Some(id)).await
351}
352
353async fn http_notify(
354    inner: &mut HttpMcpClientInner,
355    server_name: &str,
356    method: &str,
357    params: serde_json::Value,
358) -> Result<(), VmError> {
359    let _ = send_http_request(inner, server_name, method, params, None).await?;
360    Ok(())
361}
362
363async fn send_http_request(
364    inner: &mut HttpMcpClientInner,
365    server_name: &str,
366    method: &str,
367    params: serde_json::Value,
368    id: Option<u64>,
369) -> Result<serde_json::Value, VmError> {
370    for attempt in 0..2 {
371        let response = send_http_request_once(inner, method, params.clone(), id).await?;
372
373        let status = response.status().as_u16();
374        let headers = response.headers().clone();
375        if let Some(protocol_version) = headers
376            .get("MCP-Protocol-Version")
377            .and_then(|v| v.to_str().ok())
378        {
379            inner.protocol_version = protocol_version.to_string();
380        }
381        if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
382            inner.session_id = Some(session_id.to_string());
383        }
384
385        if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
386            inner.session_id = None;
387            inner.abort_get_stream();
388            reinitialize_http_client(inner).await?;
389            continue;
390        }
391
392        if status == 401 {
393            return Err(VmError::Thrown(VmValue::String(Rc::from(
394                "MCP authorization required",
395            ))));
396        }
397
398        let body = response
399            .text()
400            .await
401            .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
402
403        if body.trim().is_empty() {
404            if status < 400 {
405                ensure_http_get_stream(inner, server_name);
406            }
407            return Ok(serde_json::Value::Null);
408        }
409
410        let msg = parse_http_response_body(inner, server_name, &body, status, id).await?;
411
412        if status >= 400 {
413            return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
414        }
415
416        ensure_http_get_stream(inner, server_name);
417        if id.is_none() {
418            return Ok(msg);
419        }
420        return parse_jsonrpc_result(msg);
421    }
422
423    Err(VmError::Runtime("MCP HTTP request failed".into()))
424}
425
426async fn send_http_request_once(
427    inner: &mut HttpMcpClientInner,
428    method: &str,
429    params: serde_json::Value,
430    id: Option<u64>,
431) -> Result<reqwest::Response, VmError> {
432    let mut payload = serde_json::json!({
433        "jsonrpc": "2.0",
434        "method": method,
435        "params": params,
436    });
437    if let Some(id) = id {
438        payload["id"] = serde_json::json!(id);
439    }
440    let payload = wrap_http_payload(payload, inner.proxy_server_name.as_deref());
441
442    let request = inner
443        .client
444        .post(&inner.url)
445        .header("Content-Type", "application/json")
446        .header("Accept", "application/json, text/event-stream")
447        .json(&payload);
448    let request = apply_http_headers(
449        request,
450        &inner.auth_token,
451        &inner.protocol_version,
452        inner.session_id.as_deref(),
453    );
454
455    request
456        .timeout(MCP_TIMEOUT)
457        .send()
458        .await
459        .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
460}
461
462fn ensure_http_get_stream(inner: &mut HttpMcpClientInner, server_name: &str) {
463    if server_name.is_empty() {
464        return;
465    }
466    if inner
467        .get_stream_task
468        .as_ref()
469        .is_some_and(|task| !task.is_finished())
470    {
471        return;
472    }
473
474    let config = HttpStreamConfig {
475        client: inner.client.clone(),
476        url: inner.url.clone(),
477        auth_token: inner.auth_token.clone(),
478        protocol_version: inner.protocol_version.clone(),
479        session_id: inner.session_id.clone(),
480        proxy_server_name: inner.proxy_server_name.clone(),
481        server_name: server_name.to_string(),
482    };
483    inner.get_stream_task = Some(tokio::task::spawn_local(run_http_get_stream(config)));
484}
485
486#[derive(Clone)]
487struct HttpStreamConfig {
488    client: reqwest::Client,
489    url: String,
490    auth_token: Option<String>,
491    protocol_version: String,
492    session_id: Option<String>,
493    proxy_server_name: Option<String>,
494    server_name: String,
495}
496
497async fn run_http_get_stream(config: HttpStreamConfig) {
498    let request = apply_http_headers(
499        config
500            .client
501            .get(&config.url)
502            .header("Accept", "text/event-stream"),
503        &config.auth_token,
504        &config.protocol_version,
505        config.session_id.as_deref(),
506    );
507    let Ok(mut stream) = EventSource::new(request) else {
508        return;
509    };
510
511    while let Some(event) = stream.next().await {
512        match event {
513            Ok(SseEvent::Open) => {}
514            Ok(SseEvent::Message(message)) => {
515                if message.data.trim().is_empty() {
516                    continue;
517                }
518                let Ok(msg) = serde_json::from_str::<serde_json::Value>(&message.data) else {
519                    tracing::debug!("MCP HTTP GET stream received non-JSON event");
520                    continue;
521                };
522                if let Some(response) =
523                    handle_inbound_client_request(&config.server_name, &msg).await
524                {
525                    let _ = post_http_jsonrpc_payload(&config, response).await;
526                }
527            }
528            Err(error) => {
529                tracing::debug!("MCP HTTP GET stream ended with error: {error}");
530                break;
531            }
532        }
533    }
534    stream.close();
535}
536
537async fn post_http_jsonrpc_payload(
538    config: &HttpStreamConfig,
539    payload: serde_json::Value,
540) -> Result<(), VmError> {
541    let payload = wrap_http_payload(payload, config.proxy_server_name.as_deref());
542    let request = config
543        .client
544        .post(&config.url)
545        .header("Content-Type", "application/json")
546        .header("Accept", "application/json, text/event-stream")
547        .json(&payload)
548        .timeout(MCP_TIMEOUT);
549    let request = apply_http_headers(
550        request,
551        &config.auth_token,
552        &config.protocol_version,
553        config.session_id.as_deref(),
554    );
555    let response = request
556        .send()
557        .await
558        .map_err(|e| VmError::Runtime(format!("MCP HTTP response POST error: {e}")))?;
559    if response.status().is_success() {
560        Ok(())
561    } else {
562        Err(VmError::Runtime(format!(
563            "MCP HTTP response POST returned {}",
564            response.status()
565        )))
566    }
567}
568
569fn apply_http_headers(
570    mut request: reqwest::RequestBuilder,
571    auth_token: &Option<String>,
572    protocol_version: &str,
573    session_id: Option<&str>,
574) -> reqwest::RequestBuilder {
575    request = request.header("MCP-Protocol-Version", protocol_version);
576    if let Some(token) = auth_token {
577        request = request.header("Authorization", format!("Bearer {token}"));
578    }
579    if let Some(session_id) = session_id {
580        request = request.header("MCP-Session-Id", session_id);
581    }
582    request
583}
584
585fn wrap_http_payload(
586    payload: serde_json::Value,
587    proxy_server_name: Option<&str>,
588) -> serde_json::Value {
589    let Some(proxy_server_name) = proxy_server_name else {
590        return payload;
591    };
592    let mut wrapped = serde_json::Map::new();
593    wrapped.insert(
594        "serverName".to_string(),
595        serde_json::Value::String(proxy_server_name.to_string()),
596    );
597    if let Some(object) = payload.as_object() {
598        for (key, value) in object {
599            wrapped.insert(key.clone(), value.clone());
600        }
601    }
602    serde_json::Value::Object(wrapped)
603}
604
605async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
606    let initialize = send_http_request_once(
607        inner,
608        "initialize",
609        serde_json::json!({
610            "protocolVersion": PROTOCOL_VERSION,
611            "capabilities": {
612                "elicitation": {},
613                "roots": {
614                    "listChanged": true,
615                },
616                "sampling": {},
617            },
618            "clientInfo": {
619                "name": "harn",
620                "version": env!("CARGO_PKG_VERSION"),
621            }
622        }),
623        Some(0),
624    )
625    .await?;
626    if let Some(protocol_version) = initialize
627        .headers()
628        .get("MCP-Protocol-Version")
629        .and_then(|v| v.to_str().ok())
630    {
631        inner.protocol_version = protocol_version.to_string();
632    }
633    if let Some(session_id) = initialize
634        .headers()
635        .get("MCP-Session-Id")
636        .and_then(|v| v.to_str().ok())
637    {
638        inner.session_id = Some(session_id.to_string());
639    }
640    let status = initialize.status().as_u16();
641    let body = initialize
642        .text()
643        .await
644        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
645    let msg = parse_http_response_body(inner, "", &body, status, Some(0)).await?;
646    if status >= 400 {
647        return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
648    }
649    let _ = parse_jsonrpc_result(msg)?;
650    let response = send_http_request_once(
651        inner,
652        "notifications/initialized",
653        serde_json::json!({}),
654        None,
655    )
656    .await?;
657    let status = response.status().as_u16();
658    if let Some(protocol_version) = response
659        .headers()
660        .get("MCP-Protocol-Version")
661        .and_then(|v| v.to_str().ok())
662    {
663        inner.protocol_version = protocol_version.to_string();
664    }
665    if let Some(session_id) = response
666        .headers()
667        .get("MCP-Session-Id")
668        .and_then(|v| v.to_str().ok())
669    {
670        inner.session_id = Some(session_id.to_string());
671    }
672    let body = response
673        .text()
674        .await
675        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
676    if body.trim().is_empty() || status < 400 {
677        return Ok(());
678    }
679    let msg = parse_http_response_body(inner, "", &body, status, None).await?;
680    Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
681}
682
683async fn parse_http_response_body(
684    inner: &HttpMcpClientInner,
685    server_name: &str,
686    body: &str,
687    status: u16,
688    request_id: Option<u64>,
689) -> Result<serde_json::Value, VmError> {
690    if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
691        return parse_sse_jsonrpc_body(inner, server_name, body, request_id).await;
692    }
693    serde_json::from_str(body).map_err(|e| {
694        VmError::Runtime(format!(
695            "MCP HTTP response parse error (status {status}): {e}"
696        ))
697    })
698}
699
700async fn parse_sse_jsonrpc_body(
701    inner: &HttpMcpClientInner,
702    server_name: &str,
703    body: &str,
704    request_id: Option<u64>,
705) -> Result<serde_json::Value, VmError> {
706    let mut current_data = Vec::new();
707    let mut messages = Vec::new();
708
709    for line in body.lines() {
710        if line.is_empty() {
711            if !current_data.is_empty() {
712                messages.push(current_data.join("\n"));
713                current_data.clear();
714            }
715            continue;
716        }
717        if let Some(data) = line.strip_prefix("data:") {
718            current_data.push(data.trim_start().to_string());
719        }
720    }
721    if !current_data.is_empty() {
722        messages.push(current_data.join("\n"));
723    }
724
725    let config = HttpStreamConfig {
726        client: inner.client.clone(),
727        url: inner.url.clone(),
728        auth_token: inner.auth_token.clone(),
729        protocol_version: inner.protocol_version.clone(),
730        session_id: inner.session_id.clone(),
731        proxy_server_name: inner.proxy_server_name.clone(),
732        server_name: server_name.to_string(),
733    };
734
735    let mut fallback = None;
736    for message in messages {
737        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
738            if request_id.is_some()
739                && value["id"].as_u64() == request_id
740                && (value.get("result").is_some() || value.get("error").is_some())
741            {
742                return Ok(value);
743            }
744            if let Some(response) = handle_inbound_client_request(server_name, &value).await {
745                let _ = post_http_jsonrpc_payload(&config, response).await;
746                continue;
747            }
748            if value.get("result").is_some() || value.get("error").is_some() {
749                fallback = Some(value);
750            }
751        }
752    }
753
754    fallback.ok_or_else(|| {
755        VmError::Runtime(
756            "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
757        )
758    })
759}
760
761fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
762    if let Some(error) = msg.get("error") {
763        return Err(jsonrpc_error_to_vm_error(error));
764    }
765    Ok(msg
766        .get("result")
767        .cloned()
768        .unwrap_or(serde_json::Value::Null))
769}
770
771fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
772    let message = error
773        .get("message")
774        .and_then(|v| v.as_str())
775        .unwrap_or("Unknown MCP error");
776    let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
777    VmError::Thrown(VmValue::String(Rc::from(format!(
778        "MCP error ({code}): {message}"
779    ))))
780}
781
782fn client_request_rejection(msg: &serde_json::Value) -> Option<serde_json::Value> {
783    let request_id = msg.get("id")?.clone();
784    let method = msg.get("method").and_then(|value| value.as_str())?;
785    crate::mcp_protocol::unsupported_latest_spec_method_response(request_id.clone(), method)
786        .or_else(|| {
787            Some(crate::jsonrpc::error_response(
788                request_id,
789                -32601,
790                &format!("Method not found: {method}"),
791            ))
792        })
793}
794
795fn harn_roots_list_response(id: serde_json::Value) -> serde_json::Value {
796    crate::jsonrpc::response(
797        id,
798        serde_json::json!({
799            "roots": current_mcp_roots()
800                .iter()
801                .map(McpRoot::protocol_json)
802                .collect::<Vec<_>>()
803        }),
804    )
805}
806
807pub(crate) fn current_mcp_roots() -> Vec<McpRoot> {
808    compact_root_paths(current_mcp_root_candidates())
809        .into_iter()
810        .filter_map(|path| {
811            let uri = url::Url::from_file_path(&path).ok()?.to_string();
812            Some(McpRoot {
813                name: root_display_name(&path),
814                path: path.to_string_lossy().into_owned(),
815                uri,
816            })
817        })
818        .collect()
819}
820
821fn current_mcp_root_candidates() -> Vec<PathBuf> {
822    let mut candidates = Vec::new();
823    if let Some(context) = crate::stdlib::process::current_execution_context() {
824        if let Some(path) = non_empty_path(context.worktree_path.as_deref()) {
825            candidates.push(path);
826        }
827        if let Some(cwd) = non_empty_path(context.cwd.as_deref()) {
828            push_project_root_or_path(&mut candidates, cwd);
829        }
830        if let Some(source_dir) = non_empty_path(context.source_dir.as_deref()) {
831            push_project_root_or_path(&mut candidates, source_dir);
832        }
833    } else {
834        push_project_root_or_path(
835            &mut candidates,
836            crate::stdlib::process::execution_root_path(),
837        );
838        push_project_root_or_path(&mut candidates, crate::stdlib::process::source_root_path());
839    }
840
841    if candidates.is_empty() {
842        candidates.push(crate::stdlib::process::execution_root_path());
843    }
844    candidates
845}
846
847fn non_empty_path(raw: Option<&str>) -> Option<PathBuf> {
848    raw.filter(|path| !path.trim().is_empty())
849        .map(PathBuf::from)
850}
851
852fn push_project_root_or_path(candidates: &mut Vec<PathBuf>, path: PathBuf) {
853    let normalized = crate::stdlib::process::normalize_context_path(&path);
854    match crate::stdlib::process::find_project_root(&normalized) {
855        Some(root) => candidates.push(root),
856        None => candidates.push(normalized),
857    }
858}
859
860fn compact_root_paths(paths: Vec<PathBuf>) -> Vec<PathBuf> {
861    let mut normalized = paths
862        .into_iter()
863        .map(normalize_root_path)
864        .collect::<Vec<_>>();
865    normalized.sort_by_key(|path| {
866        (
867            path.components().count(),
868            path.to_string_lossy().to_string(),
869        )
870    });
871
872    let mut roots: Vec<PathBuf> = Vec::new();
873    for path in normalized {
874        if roots
875            .iter()
876            .any(|existing| path == *existing || path.starts_with(existing))
877        {
878            continue;
879        }
880        roots.push(path);
881    }
882    roots
883}
884
885fn normalize_root_path(path: PathBuf) -> PathBuf {
886    let absolute = crate::stdlib::process::normalize_context_path(&path);
887    std::fs::canonicalize(&absolute).unwrap_or(absolute)
888}
889
890fn root_display_name(path: &Path) -> String {
891    path.file_name()
892        .and_then(|name| name.to_str())
893        .filter(|name| !name.is_empty())
894        .map(str::to_string)
895        .unwrap_or_else(|| path.display().to_string())
896}
897
898async fn mcp_connect_stdio_impl(
899    command: &str,
900    args: &[String],
901    env: &BTreeMap<String, String>,
902) -> Result<VmMcpClientHandle, VmError> {
903    let mut cmd = tokio::process::Command::new(command);
904    cmd.args(args)
905        .stdin(std::process::Stdio::piped())
906        .stdout(std::process::Stdio::piped())
907        .stderr(std::process::Stdio::inherit())
908        .envs(env);
909
910    let mut child = cmd.spawn().map_err(|e| {
911        VmError::Thrown(VmValue::String(Rc::from(format!(
912            "mcp_connect: failed to spawn '{command}': {e}"
913        ))))
914    })?;
915
916    let stdin = child
917        .stdin
918        .take()
919        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
920    let stdout = child
921        .stdout
922        .take()
923        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
924
925    let handle = VmMcpClientHandle {
926        name: command.to_string(),
927        inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
928            StdioMcpClientInner {
929                child,
930                stdin,
931                reader: BufReader::new(stdout),
932                next_id: 1,
933            },
934        )))),
935        last_roots: Arc::new(Mutex::new(Vec::new())),
936        initialize_result: Arc::new(Mutex::new(None)),
937    };
938
939    initialize_client(&handle).await?;
940    Ok(handle)
941}
942
943async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
944    let client = reqwest::Client::builder()
945        .build()
946        .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
947
948    let handle = VmMcpClientHandle {
949        name: spec.name.clone(),
950        inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
951            client,
952            url: spec.url.clone(),
953            auth_token: spec.auth_token.clone(),
954            protocol_version: spec
955                .protocol_version
956                .clone()
957                .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
958            session_id: None,
959            next_id: 1,
960            proxy_server_name: spec.proxy_server_name.clone(),
961            get_stream_task: None,
962        })))),
963        last_roots: Arc::new(Mutex::new(Vec::new())),
964        initialize_result: Arc::new(Mutex::new(None)),
965    };
966
967    initialize_client(&handle).await?;
968    Ok(handle)
969}
970
971async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
972    let initialize_result = handle
973        .call(
974            "initialize",
975            serde_json::json!({
976                "protocolVersion": PROTOCOL_VERSION,
977                "capabilities": {
978                    "elicitation": {},
979                    "roots": {
980                        "listChanged": true,
981                    },
982                    "sampling": {},
983                },
984                "clientInfo": {
985                    "name": "harn",
986                    "version": env!("CARGO_PKG_VERSION"),
987                }
988            }),
989        )
990        .await?;
991    *handle.initialize_result.lock().await = Some(initialize_result);
992
993    handle
994        .notify("notifications/initialized", serde_json::json!({}))
995        .await?;
996
997    Ok(())
998}
999
1000pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
1001    match val {
1002        VmValue::String(s) => serde_json::Value::String(s.to_string()),
1003        VmValue::Int(n) => serde_json::json!(*n),
1004        VmValue::Float(n) => serde_json::json!(*n),
1005        VmValue::Bool(b) => serde_json::Value::Bool(*b),
1006        VmValue::Nil => serde_json::Value::Null,
1007        VmValue::List(items) => {
1008            serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
1009        }
1010        VmValue::Dict(map) => {
1011            let obj: serde_json::Map<String, serde_json::Value> = map
1012                .iter()
1013                .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1014                .collect();
1015            serde_json::Value::Object(obj)
1016        }
1017        _ => serde_json::Value::Null,
1018    }
1019}
1020
1021fn extract_content_text(result: &serde_json::Value) -> String {
1022    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
1023        let texts: Vec<&str> = content
1024            .iter()
1025            .filter_map(|item| {
1026                if item.get("type").and_then(|t| t.as_str()) == Some("text") {
1027                    item.get("text").and_then(|t| t.as_str())
1028                } else {
1029                    None
1030                }
1031            })
1032            .collect();
1033        if texts.is_empty() {
1034            json_to_vm_value(result).display()
1035        } else {
1036            texts.join("\n")
1037        }
1038    } else {
1039        json_to_vm_value(result).display()
1040    }
1041}
1042
1043pub(crate) async fn call_mcp_tool(
1044    client: &VmMcpClientHandle,
1045    tool_name: &str,
1046    arguments: serde_json::Value,
1047) -> Result<serde_json::Value, VmError> {
1048    let result = client
1049        .call(
1050            "tools/call",
1051            serde_json::json!({
1052                "name": tool_name,
1053                "arguments": arguments,
1054            }),
1055        )
1056        .await?;
1057
1058    if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
1059        let error_text = extract_content_text(&result);
1060        return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
1061    }
1062
1063    let content = result
1064        .get("content")
1065        .and_then(|c| c.as_array())
1066        .cloned()
1067        .unwrap_or_default();
1068
1069    if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
1070        if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
1071            return Ok(serde_json::Value::String(text.to_string()));
1072        }
1073    }
1074
1075    if content.is_empty() {
1076        Ok(serde_json::Value::Null)
1077    } else {
1078        Ok(serde_json::Value::Array(content))
1079    }
1080}
1081
1082pub async fn connect_mcp_server(
1083    name: &str,
1084    command: &str,
1085    args: &[String],
1086) -> Result<VmMcpClientHandle, VmError> {
1087    let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
1088    handle.name = name.to_string();
1089    Ok(handle)
1090}
1091
1092pub async fn connect_mcp_server_from_spec(
1093    spec: &McpServerSpec,
1094) -> Result<VmMcpClientHandle, VmError> {
1095    let mut handle = match spec.transport {
1096        McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
1097        McpTransport::Http => mcp_connect_http_impl(spec).await?,
1098    };
1099    handle.name = spec.name.clone();
1100    Ok(handle)
1101}
1102
1103pub async fn connect_mcp_server_from_json(
1104    value: &serde_json::Value,
1105) -> Result<VmMcpClientHandle, VmError> {
1106    let spec: McpServerSpec = serde_json::from_value(value.clone())
1107        .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
1108    connect_mcp_server_from_spec(&spec).await
1109}
1110
1111pub fn register_mcp_builtins(vm: &mut Vm) {
1112    vm.register_builtin("mcp_roots", mcp_roots_builtin);
1113    vm.register_builtin("harn.mcp.roots", mcp_roots_builtin);
1114    register_harn_mcp_namespace(vm);
1115
1116    vm.register_async_builtin("mcp_connect", |args| async move {
1117        let command = args.first().map(|a| a.display()).unwrap_or_default();
1118        if command.is_empty() {
1119            return Err(VmError::Thrown(VmValue::String(Rc::from(
1120                "mcp_connect: command is required",
1121            ))));
1122        }
1123
1124        let cmd_args: Vec<String> = match args.get(1) {
1125            Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
1126            _ => Vec::new(),
1127        };
1128
1129        let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
1130        Ok(VmValue::McpClient(handle))
1131    });
1132
1133    // Lazy registry: ensure a registered server is booted and return its
1134    // live client handle. Used by skill activation (`requires_mcp`) and
1135    // by user code that wants to trigger a lazy connect explicitly.
1136    vm.register_async_builtin("mcp_ensure_active", |args| async move {
1137        let name = match args.first() {
1138            Some(VmValue::String(s)) => s.to_string(),
1139            Some(other) => other.display(),
1140            None => String::new(),
1141        };
1142        if name.is_empty() {
1143            return Err(VmError::Thrown(VmValue::String(Rc::from(
1144                "mcp_ensure_active: server name is required",
1145            ))));
1146        }
1147        let handle = crate::mcp_registry::ensure_active(&name).await?;
1148        Ok(VmValue::McpClient(handle))
1149    });
1150
1151    // Decrement the binder refcount for a registered server. Called by
1152    // skill deactivation paths and by user code that manually bound via
1153    // `mcp_ensure_active`. No-op when the name isn't registered.
1154    vm.register_builtin("mcp_release", |args, _out| {
1155        let name = match args.first() {
1156            Some(VmValue::String(s)) => s.to_string(),
1157            Some(other) => other.display(),
1158            None => {
1159                return Err(VmError::Thrown(VmValue::String(Rc::from(
1160                    "mcp_release: server name is required",
1161                ))));
1162            }
1163        };
1164        crate::mcp_registry::release(&name);
1165        Ok(VmValue::Nil)
1166    });
1167
1168    // Return the declared MCP servers and their current state as a list
1169    // of dicts. Purely diagnostic — useful for `harn` scripts that want
1170    // to show connection state in a status-line or dashboard.
1171    vm.register_builtin("mcp_registry_status", |_args, _out| {
1172        let mut out = Vec::new();
1173        for entry in crate::mcp_registry::snapshot_status() {
1174            let mut dict = BTreeMap::new();
1175            dict.insert(
1176                "name".to_string(),
1177                VmValue::String(Rc::from(entry.name.as_str())),
1178            );
1179            dict.insert("lazy".to_string(), VmValue::Bool(entry.lazy));
1180            dict.insert("active".to_string(), VmValue::Bool(entry.active));
1181            dict.insert(
1182                "ref_count".to_string(),
1183                VmValue::Int(entry.ref_count as i64),
1184            );
1185            if let Some(card) = entry.card {
1186                dict.insert("card".to_string(), VmValue::String(Rc::from(card.as_str())));
1187            }
1188            out.push(VmValue::Dict(Rc::new(dict)));
1189        }
1190        Ok(VmValue::List(Rc::new(out)))
1191    });
1192
1193    // Fetch (or read from cache) the Server Card for a registered MCP
1194    // server, or from an explicit URL / local path.
1195    //
1196    // `mcp_server_card("notion")`           -> looks up `card = ...` in harn.toml
1197    // `mcp_server_card("https://.../card")` -> fetches that URL directly
1198    // `mcp_server_card("./card.json")`      -> reads that file directly
1199    vm.register_async_builtin("mcp_server_card", |args| async move {
1200        let target = match args.first() {
1201            Some(VmValue::String(s)) => s.to_string(),
1202            Some(other) => other.display(),
1203            None => {
1204                return Err(VmError::Thrown(VmValue::String(Rc::from(
1205                    "mcp_server_card: server name, URL, or path is required",
1206                ))));
1207            }
1208        };
1209
1210        // Source resolution: if the arg looks like a URL or path
1211        // (contains '/', '\\', or starts with a scheme), use it as-is.
1212        // Otherwise treat it as a registered server name and look up
1213        // its `card` field. This matches the user model: "I already
1214        // wrote down where the card lives in harn.toml — just use it."
1215        let source = if target.starts_with("http://")
1216            || target.starts_with("https://")
1217            || target.contains('/')
1218            || target.contains('\\')
1219            || target.ends_with(".json")
1220        {
1221            target.clone()
1222        } else {
1223            match crate::mcp_registry::get_registration(&target) {
1224                Some(reg) => match reg.card {
1225                    Some(card) => card,
1226                    None => {
1227                        return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1228                            "mcp_server_card: server '{target}' has no 'card' field in harn.toml"
1229                        )))));
1230                    }
1231                },
1232                None => {
1233                    return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1234                        "mcp_server_card: no MCP server '{target}' registered (check harn.toml) \
1235                         — pass a URL or path directly instead"
1236                    )))));
1237                }
1238            }
1239        };
1240
1241        let card = crate::mcp_card::fetch_server_card(&source, None)
1242            .await
1243            .map_err(|e| {
1244                VmError::Thrown(VmValue::String(Rc::from(format!("mcp_server_card: {e}"))))
1245            })?;
1246        Ok(json_to_vm_value(&card))
1247    });
1248
1249    vm.register_async_builtin("mcp_list_tools", |args| async move {
1250        let client = match args.first() {
1251            Some(VmValue::McpClient(c)) => c.clone(),
1252            _ => {
1253                return Err(VmError::Thrown(VmValue::String(Rc::from(
1254                    "mcp_list_tools: argument must be an MCP client",
1255                ))));
1256            }
1257        };
1258
1259        let result = client.call("tools/list", serde_json::json!({})).await?;
1260        let mut tools = result
1261            .get("tools")
1262            .and_then(|t| t.as_array())
1263            .cloned()
1264            .unwrap_or_default();
1265
1266        // Tag every tool with its originating server name so
1267        // downstream indexers (tool_search BM25) can surface them
1268        // under queries like "github" or "mcp:github". Harmless to
1269        // non-indexing callers — just an extra dict key.
1270        let server_name = client.name.clone();
1271        for tool in tools.iter_mut() {
1272            if let Some(obj) = tool.as_object_mut() {
1273                obj.entry("_mcp_server")
1274                    .or_insert_with(|| serde_json::Value::String(server_name.clone()));
1275            }
1276        }
1277
1278        let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
1279        Ok(VmValue::List(Rc::new(vm_tools)))
1280    });
1281
1282    vm.register_async_builtin("mcp_call", |args| async move {
1283        let client = match args.first() {
1284            Some(VmValue::McpClient(c)) => c.clone(),
1285            _ => {
1286                return Err(VmError::Thrown(VmValue::String(Rc::from(
1287                    "mcp_call: first argument must be an MCP client",
1288                ))));
1289            }
1290        };
1291
1292        let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
1293        if tool_name.is_empty() {
1294            return Err(VmError::Thrown(VmValue::String(Rc::from(
1295                "mcp_call: tool name is required",
1296            ))));
1297        }
1298
1299        let arguments = match args.get(2) {
1300            Some(VmValue::Dict(d)) => {
1301                let obj: serde_json::Map<String, serde_json::Value> = d
1302                    .iter()
1303                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1304                    .collect();
1305                serde_json::Value::Object(obj)
1306            }
1307            _ => serde_json::json!({}),
1308        };
1309
1310        Ok(json_to_vm_value(
1311            &call_mcp_tool(&client, &tool_name, arguments).await?,
1312        ))
1313    });
1314
1315    vm.register_async_builtin("mcp_server_info", |args| async move {
1316        let client = match args.first() {
1317            Some(VmValue::McpClient(c)) => c.clone(),
1318            _ => {
1319                return Err(VmError::Thrown(VmValue::String(Rc::from(
1320                    "mcp_server_info: argument must be an MCP client",
1321                ))));
1322            }
1323        };
1324
1325        let guard = client.inner.lock().await;
1326        if guard.is_none() {
1327            return Err(VmError::Runtime("MCP client is disconnected".into()));
1328        }
1329        drop(guard);
1330
1331        let mut info = BTreeMap::new();
1332        info.insert(
1333            "name".to_string(),
1334            VmValue::String(Rc::from(client.name.as_str())),
1335        );
1336        info.insert("connected".to_string(), VmValue::Bool(true));
1337        let initialize = client
1338            .initialize_result
1339            .lock()
1340            .await
1341            .clone()
1342            .unwrap_or(serde_json::Value::Null);
1343        if !initialize.is_null() {
1344            if let Some(instructions) = initialize
1345                .get("instructions")
1346                .or_else(|| {
1347                    initialize
1348                        .get("serverInfo")
1349                        .and_then(|value| value.get("instructions"))
1350                })
1351                .and_then(|value| value.as_str())
1352                .filter(|value| !value.is_empty())
1353            {
1354                info.insert(
1355                    "instructions".to_string(),
1356                    VmValue::String(Rc::from(instructions)),
1357                );
1358            }
1359            info.insert("initialize".to_string(), json_to_vm_value(&initialize));
1360        }
1361        Ok(VmValue::Dict(Rc::new(info)))
1362    });
1363
1364    vm.register_async_builtin("mcp_disconnect", |args| async move {
1365        let client = match args.first() {
1366            Some(VmValue::McpClient(c)) => c.clone(),
1367            _ => {
1368                return Err(VmError::Thrown(VmValue::String(Rc::from(
1369                    "mcp_disconnect: argument must be an MCP client",
1370                ))));
1371            }
1372        };
1373
1374        client.disconnect().await?;
1375        Ok(VmValue::Nil)
1376    });
1377
1378    vm.register_async_builtin("mcp_list_resources", |args| async move {
1379        let client = match args.first() {
1380            Some(VmValue::McpClient(c)) => c.clone(),
1381            _ => {
1382                return Err(VmError::Thrown(VmValue::String(Rc::from(
1383                    "mcp_list_resources: argument must be an MCP client",
1384                ))));
1385            }
1386        };
1387
1388        let result = client.call("resources/list", serde_json::json!({})).await?;
1389        let resources = result
1390            .get("resources")
1391            .and_then(|r| r.as_array())
1392            .cloned()
1393            .unwrap_or_default();
1394
1395        let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
1396        Ok(VmValue::List(Rc::new(vm_resources)))
1397    });
1398
1399    vm.register_async_builtin("mcp_read_resource", |args| async move {
1400        let client = match args.first() {
1401            Some(VmValue::McpClient(c)) => c.clone(),
1402            _ => {
1403                return Err(VmError::Thrown(VmValue::String(Rc::from(
1404                    "mcp_read_resource: first argument must be an MCP client",
1405                ))));
1406            }
1407        };
1408
1409        let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
1410        if uri.is_empty() {
1411            return Err(VmError::Thrown(VmValue::String(Rc::from(
1412                "mcp_read_resource: URI is required",
1413            ))));
1414        }
1415
1416        let result = client
1417            .call("resources/read", serde_json::json!({ "uri": uri }))
1418            .await?;
1419
1420        let contents = result
1421            .get("contents")
1422            .and_then(|c| c.as_array())
1423            .cloned()
1424            .unwrap_or_default();
1425
1426        if contents.len() == 1 {
1427            if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
1428                return Ok(VmValue::String(Rc::from(text)));
1429            }
1430        }
1431
1432        if contents.is_empty() {
1433            Ok(VmValue::Nil)
1434        } else {
1435            Ok(VmValue::List(Rc::new(
1436                contents.iter().map(json_to_vm_value).collect(),
1437            )))
1438        }
1439    });
1440
1441    vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
1442        let client = match args.first() {
1443            Some(VmValue::McpClient(c)) => c.clone(),
1444            _ => {
1445                return Err(VmError::Thrown(VmValue::String(Rc::from(
1446                    "mcp_list_resource_templates: argument must be an MCP client",
1447                ))));
1448            }
1449        };
1450
1451        let result = client
1452            .call("resources/templates/list", serde_json::json!({}))
1453            .await?;
1454
1455        let templates = result
1456            .get("resourceTemplates")
1457            .and_then(|r| r.as_array())
1458            .cloned()
1459            .unwrap_or_default();
1460
1461        let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
1462        Ok(VmValue::List(Rc::new(vm_templates)))
1463    });
1464
1465    vm.register_async_builtin("mcp_list_prompts", |args| async move {
1466        let client = match args.first() {
1467            Some(VmValue::McpClient(c)) => c.clone(),
1468            _ => {
1469                return Err(VmError::Thrown(VmValue::String(Rc::from(
1470                    "mcp_list_prompts: argument must be an MCP client",
1471                ))));
1472            }
1473        };
1474
1475        let result = client.call("prompts/list", serde_json::json!({})).await?;
1476
1477        let prompts = result
1478            .get("prompts")
1479            .and_then(|p| p.as_array())
1480            .cloned()
1481            .unwrap_or_default();
1482
1483        let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
1484        Ok(VmValue::List(Rc::new(vm_prompts)))
1485    });
1486
1487    vm.register_async_builtin("mcp_get_prompt", |args| async move {
1488        let client = match args.first() {
1489            Some(VmValue::McpClient(c)) => c.clone(),
1490            _ => {
1491                return Err(VmError::Thrown(VmValue::String(Rc::from(
1492                    "mcp_get_prompt: first argument must be an MCP client",
1493                ))));
1494            }
1495        };
1496
1497        let name = args.get(1).map(|a| a.display()).unwrap_or_default();
1498        if name.is_empty() {
1499            return Err(VmError::Thrown(VmValue::String(Rc::from(
1500                "mcp_get_prompt: prompt name is required",
1501            ))));
1502        }
1503
1504        let arguments = match args.get(2) {
1505            Some(VmValue::Dict(d)) => {
1506                let obj: serde_json::Map<String, serde_json::Value> = d
1507                    .iter()
1508                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1509                    .collect();
1510                serde_json::Value::Object(obj)
1511            }
1512            _ => serde_json::json!({}),
1513        };
1514
1515        let result = client
1516            .call(
1517                "prompts/get",
1518                serde_json::json!({
1519                    "name": name,
1520                    "arguments": arguments,
1521                }),
1522            )
1523            .await?;
1524
1525        Ok(json_to_vm_value(&result))
1526    });
1527}
1528
1529fn mcp_roots_builtin(_args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
1530    Ok(VmValue::List(Rc::new(
1531        current_mcp_roots()
1532            .iter()
1533            .map(|root| json_to_vm_value(&root.script_json()))
1534            .collect(),
1535    )))
1536}
1537
1538fn register_harn_mcp_namespace(vm: &mut Vm) {
1539    let mcp_namespace = VmValue::Dict(Rc::new(BTreeMap::from([
1540        (
1541            "_namespace".to_string(),
1542            VmValue::String(Rc::from("harn.mcp")),
1543        ),
1544        (
1545            "roots".to_string(),
1546            VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1547        ),
1548    ])));
1549    vm.set_global(
1550        "harn",
1551        VmValue::Dict(Rc::new(BTreeMap::from([
1552            ("_namespace".to_string(), VmValue::String(Rc::from("harn"))),
1553            (
1554                "mcp_roots".to_string(),
1555                VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1556            ),
1557            ("mcp".to_string(), mcp_namespace),
1558        ]))),
1559    );
1560}
1561
1562#[cfg(test)]
1563mod tests {
1564    use super::*;
1565    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1566    use tokio::net::{TcpListener, TcpStream};
1567    use tokio::sync::mpsc;
1568
1569    #[tokio::test(flavor = "current_thread")]
1570    async fn http_get_stream_dispatches_inbound_elicitation_response() {
1571        tokio::task::LocalSet::new()
1572            .run_until(async {
1573                let (base_url, mut responses) = spawn_eliciting_http_mcp_server().await;
1574                let spec = McpServerSpec {
1575                    name: "mock-http".to_string(),
1576                    transport: McpTransport::Http,
1577                    command: String::new(),
1578                    args: Vec::new(),
1579                    env: BTreeMap::new(),
1580                    url: format!("{base_url}/mcp"),
1581                    auth_token: None,
1582                    protocol_version: None,
1583                    proxy_server_name: None,
1584                };
1585
1586                let handle = connect_mcp_server_from_spec(&spec).await.unwrap();
1587                let response = tokio::time::timeout(MCP_TIMEOUT, responses.recv())
1588                    .await
1589                    .expect("timed out waiting for elicitation response POST")
1590                    .expect("mock server closed before receiving elicitation response");
1591
1592                assert_eq!(response["id"], serde_json::json!(99));
1593                assert_eq!(
1594                    response["result"]["action"],
1595                    serde_json::json!("decline"),
1596                    "without a host bridge, inbound elicitation should decline cleanly"
1597                );
1598                handle.disconnect().await.unwrap();
1599            })
1600            .await;
1601    }
1602
1603    async fn spawn_eliciting_http_mcp_server(
1604    ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1605        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1606        let addr = listener.local_addr().unwrap();
1607        let (response_tx, response_rx) = mpsc::unbounded_channel();
1608
1609        tokio::spawn(async move {
1610            loop {
1611                let Ok((stream, _)) = listener.accept().await else {
1612                    break;
1613                };
1614                let response_tx = response_tx.clone();
1615                tokio::spawn(async move {
1616                    let _ = handle_mock_http_mcp_connection(stream, response_tx).await;
1617                });
1618            }
1619        });
1620
1621        (format!("http://{addr}"), response_rx)
1622    }
1623
1624    async fn spawn_recording_http_mcp_server(
1625    ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1626        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1627        let addr = listener.local_addr().unwrap();
1628        let (request_tx, request_rx) = mpsc::unbounded_channel();
1629
1630        tokio::spawn(async move {
1631            loop {
1632                let Ok((mut stream, _)) = listener.accept().await else {
1633                    break;
1634                };
1635                let request_tx = request_tx.clone();
1636                tokio::spawn(async move {
1637                    let Ok((_request_line, _headers, body)) = read_http_request(&mut stream).await
1638                    else {
1639                        return;
1640                    };
1641                    if let Ok(request) = serde_json::from_slice::<serde_json::Value>(&body) {
1642                        let _ = request_tx.send(request);
1643                    }
1644                    let _ = write_http_empty(&mut stream, "202 Accepted").await;
1645                });
1646            }
1647        });
1648
1649        (format!("http://{addr}"), request_rx)
1650    }
1651
1652    async fn handle_mock_http_mcp_connection(
1653        mut stream: TcpStream,
1654        response_tx: mpsc::UnboundedSender<serde_json::Value>,
1655    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1656        let (request_line, headers, body) = read_http_request(&mut stream).await?;
1657        if request_line.starts_with("GET ") {
1658            let response = concat!(
1659                "HTTP/1.1 200 OK\r\n",
1660                "content-type: text/event-stream\r\n",
1661                "cache-control: no-cache\r\n",
1662                "\r\n",
1663                "id: prime\r\n",
1664                "data: \r\n",
1665                "\r\n",
1666                "id: elicit-1\r\n",
1667                "event: message\r\n",
1668                "data: {\"jsonrpc\":\"2.0\",\"id\":99,\"method\":\"elicitation/create\",\"params\":{\"message\":\"Need input\",\"requestedSchema\":{\"type\":\"object\",\"properties\":{}}}}\r\n",
1669                "\r\n"
1670            );
1671            stream.write_all(response.as_bytes()).await?;
1672            tokio::time::sleep(std::time::Duration::from_millis(250)).await;
1673            return Ok(());
1674        }
1675
1676        let request: serde_json::Value = serde_json::from_slice(&body)?;
1677        let method = request.get("method").and_then(|value| value.as_str());
1678        match method {
1679            Some("initialize") => {
1680                write_http_json(
1681                    &mut stream,
1682                    "200 OK",
1683                    &[("MCP-Session-Id", "test-session")],
1684                    serde_json::json!({
1685                        "jsonrpc": "2.0",
1686                        "id": request["id"].clone(),
1687                        "result": {
1688                            "protocolVersion": PROTOCOL_VERSION,
1689                            "capabilities": {
1690                                "elicitation": {},
1691                                "tools": {}
1692                            },
1693                            "serverInfo": {
1694                                "name": "mock",
1695                                "version": "0.0.0"
1696                            }
1697                        }
1698                    }),
1699                )
1700                .await?;
1701            }
1702            Some("notifications/initialized") => {
1703                write_http_empty(&mut stream, "202 Accepted").await?;
1704            }
1705            _ if request.get("result").is_some() || request.get("error").is_some() => {
1706                assert_eq!(
1707                    headers.get("mcp-session-id").map(String::as_str),
1708                    Some("test-session")
1709                );
1710                let _ = response_tx.send(request);
1711                write_http_empty(&mut stream, "202 Accepted").await?;
1712            }
1713            _ => {
1714                write_http_json(
1715                    &mut stream,
1716                    "200 OK",
1717                    &[],
1718                    serde_json::json!({
1719                        "jsonrpc": "2.0",
1720                        "id": request["id"].clone(),
1721                        "result": {}
1722                    }),
1723                )
1724                .await?;
1725            }
1726        }
1727        Ok(())
1728    }
1729
1730    async fn read_http_request(
1731        stream: &mut TcpStream,
1732    ) -> Result<(String, BTreeMap<String, String>, Vec<u8>), Box<dyn std::error::Error + Send + Sync>>
1733    {
1734        let mut buffer = Vec::new();
1735        loop {
1736            let mut chunk = [0; 1024];
1737            let bytes = stream.read(&mut chunk).await?;
1738            if bytes == 0 {
1739                break;
1740            }
1741            buffer.extend_from_slice(&chunk[..bytes]);
1742            if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
1743                break;
1744            }
1745        }
1746        let header_end = buffer
1747            .windows(4)
1748            .position(|window| window == b"\r\n\r\n")
1749            .ok_or("missing HTTP header terminator")?;
1750        let header_text = String::from_utf8(buffer[..header_end].to_vec())?;
1751        let mut lines = header_text.lines();
1752        let request_line = lines.next().unwrap_or_default().to_string();
1753        let mut headers = BTreeMap::new();
1754        for line in lines {
1755            if let Some((name, value)) = line.split_once(':') {
1756                headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
1757            }
1758        }
1759        let content_length = headers
1760            .get("content-length")
1761            .and_then(|value| value.parse::<usize>().ok())
1762            .unwrap_or(0);
1763        let mut body = buffer[header_end + 4..].to_vec();
1764        while body.len() < content_length {
1765            let mut chunk = vec![0; content_length - body.len()];
1766            let bytes = stream.read(&mut chunk).await?;
1767            if bytes == 0 {
1768                break;
1769            }
1770            body.extend_from_slice(&chunk[..bytes]);
1771        }
1772        body.truncate(content_length);
1773        Ok((request_line, headers, body))
1774    }
1775
1776    async fn write_http_json(
1777        stream: &mut TcpStream,
1778        status: &str,
1779        headers: &[(&str, &str)],
1780        body: serde_json::Value,
1781    ) -> Result<(), std::io::Error> {
1782        let body = serde_json::to_string(&body).unwrap();
1783        let mut response = format!(
1784            "HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\n",
1785            body.len()
1786        );
1787        for (name, value) in headers {
1788            response.push_str(name);
1789            response.push_str(": ");
1790            response.push_str(value);
1791            response.push_str("\r\n");
1792        }
1793        response.push_str("\r\n");
1794        response.push_str(&body);
1795        stream.write_all(response.as_bytes()).await
1796    }
1797
1798    async fn write_http_empty(stream: &mut TcpStream, status: &str) -> Result<(), std::io::Error> {
1799        let response = format!("HTTP/1.1 {status}\r\ncontent-length: 0\r\n\r\n");
1800        stream.write_all(response.as_bytes()).await
1801    }
1802
1803    #[test]
1804    fn test_vm_value_to_serde_string() {
1805        let val = VmValue::String(Rc::from("hello"));
1806        let json = vm_value_to_serde(&val);
1807        assert_eq!(json, serde_json::json!("hello"));
1808    }
1809
1810    #[test]
1811    fn test_vm_value_to_serde_dict() {
1812        let mut map = BTreeMap::new();
1813        map.insert("key".to_string(), VmValue::Int(42));
1814        let val = VmValue::Dict(Rc::new(map));
1815        let json = vm_value_to_serde(&val);
1816        assert_eq!(json, serde_json::json!({"key": 42}));
1817    }
1818
1819    #[test]
1820    fn test_vm_value_to_serde_list() {
1821        let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
1822        let json = vm_value_to_serde(&val);
1823        assert_eq!(json, serde_json::json!([1, 2]));
1824    }
1825
1826    #[test]
1827    fn test_extract_content_text_single() {
1828        let result = serde_json::json!({
1829            "content": [{"type": "text", "text": "hello world"}],
1830            "isError": false
1831        });
1832        assert_eq!(extract_content_text(&result), "hello world");
1833    }
1834
1835    #[test]
1836    fn test_extract_content_text_multiple() {
1837        let result = serde_json::json!({
1838            "content": [
1839                {"type": "text", "text": "first"},
1840                {"type": "text", "text": "second"}
1841            ],
1842            "isError": false
1843        });
1844        assert_eq!(extract_content_text(&result), "first\nsecond");
1845    }
1846
1847    #[test]
1848    fn test_extract_content_text_fallback_json() {
1849        let result = serde_json::json!({
1850            "content": [{"type": "image", "data": "abc"}],
1851            "isError": false
1852        });
1853        let output = extract_content_text(&result);
1854        assert!(output.contains("image"));
1855    }
1856
1857    #[tokio::test(flavor = "current_thread")]
1858    async fn test_parse_sse_jsonrpc_body_uses_matching_jsonrpc_response() {
1859        let inner = HttpMcpClientInner {
1860            client: reqwest::Client::new(),
1861            url: "http://127.0.0.1/mcp".to_string(),
1862            auth_token: None,
1863            protocol_version: PROTOCOL_VERSION.to_string(),
1864            session_id: None,
1865            next_id: 1,
1866            proxy_server_name: None,
1867            get_stream_task: None,
1868        };
1869        let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1870        let parsed = parse_sse_jsonrpc_body(&inner, "mock", body, Some(1))
1871            .await
1872            .unwrap();
1873        assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1874    }
1875
1876    #[test]
1877    fn client_rejects_unadvertised_server_to_client_requests() {
1878        let unknown = client_request_rejection(&serde_json::json!({
1879            "jsonrpc": "2.0",
1880            "id": "custom-1",
1881            "method": "custom/method",
1882            "params": {}
1883        }))
1884        .expect("rejection");
1885        assert_eq!(unknown["error"]["code"], serde_json::json!(-32601));
1886        assert!(unknown["error"].get("data").is_none());
1887    }
1888
1889    #[test]
1890    fn current_mcp_roots_prefers_project_root_over_child_cwd() {
1891        let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1892        let child = root.join("nested");
1893        std::fs::create_dir_all(&child).unwrap();
1894        std::fs::write(root.join("harn.toml"), "[package]\nname = \"roots\"\n").unwrap();
1895
1896        crate::stdlib::process::set_thread_execution_context(Some(
1897            crate::orchestration::RunExecutionRecord {
1898                cwd: Some(child.to_string_lossy().into_owned()),
1899                source_dir: Some(child.to_string_lossy().into_owned()),
1900                ..Default::default()
1901            },
1902        ));
1903
1904        let roots = current_mcp_roots();
1905        let expected_root = std::fs::canonicalize(&root).unwrap();
1906        assert_eq!(roots.len(), 1);
1907        assert_eq!(roots[0].path, expected_root.to_string_lossy());
1908        assert!(roots[0].uri.starts_with("file://"));
1909        assert_eq!(
1910            roots[0].name,
1911            expected_root.file_name().unwrap().to_string_lossy()
1912        );
1913
1914        crate::stdlib::process::reset_process_state();
1915        let _ = std::fs::remove_dir_all(&root);
1916    }
1917
1918    #[tokio::test(flavor = "current_thread")]
1919    async fn handle_inbound_routes_roots_list() {
1920        let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1921        std::fs::create_dir_all(&root).unwrap();
1922        crate::stdlib::process::set_thread_execution_context(Some(
1923            crate::orchestration::RunExecutionRecord {
1924                cwd: Some(root.to_string_lossy().into_owned()),
1925                ..Default::default()
1926            },
1927        ));
1928
1929        let request = serde_json::json!({
1930            "jsonrpc": "2.0",
1931            "id": "roots-1",
1932            "method": crate::mcp_protocol::METHOD_ROOTS_LIST,
1933        });
1934        let response = handle_inbound_client_request("mock", &request)
1935            .await
1936            .expect("roots/list should produce a response");
1937        let expected_root = std::fs::canonicalize(&root).unwrap();
1938        assert_eq!(response["id"], serde_json::json!("roots-1"));
1939        assert_eq!(response["result"]["roots"].as_array().unwrap().len(), 1);
1940        assert_eq!(
1941            response["result"]["roots"][0]["uri"],
1942            serde_json::json!(url::Url::from_file_path(&expected_root)
1943                .unwrap()
1944                .to_string())
1945        );
1946
1947        crate::stdlib::process::reset_process_state();
1948        let _ = std::fs::remove_dir_all(&root);
1949    }
1950
1951    #[tokio::test(flavor = "current_thread")]
1952    async fn roots_list_changed_notification_is_sent_once_per_snapshot() {
1953        tokio::task::LocalSet::new()
1954            .run_until(async {
1955                let (base_url, mut requests) = spawn_recording_http_mcp_server().await;
1956                let handle = VmMcpClientHandle {
1957                    name: "mock-http".to_string(),
1958                    inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
1959                        client: reqwest::Client::new(),
1960                        url: format!("{base_url}/mcp"),
1961                        auth_token: None,
1962                        protocol_version: PROTOCOL_VERSION.to_string(),
1963                        session_id: None,
1964                        next_id: 1,
1965                        proxy_server_name: None,
1966                        get_stream_task: None,
1967                    })))),
1968                    last_roots: Arc::new(Mutex::new(Vec::new())),
1969                    initialize_result: Arc::new(Mutex::new(None)),
1970                };
1971
1972                handle.notify_roots_list_changed_if_needed().await.unwrap();
1973                let notification = tokio::time::timeout(MCP_TIMEOUT, requests.recv())
1974                    .await
1975                    .expect("timed out waiting for roots notification")
1976                    .expect("mock server closed before notification");
1977                assert_eq!(
1978                    notification["method"],
1979                    serde_json::json!(crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION)
1980                );
1981
1982                handle.notify_roots_list_changed_if_needed().await.unwrap();
1983                assert!(
1984                    tokio::time::timeout(std::time::Duration::from_millis(50), requests.recv())
1985                        .await
1986                        .is_err(),
1987                    "unchanged roots should not send another notification"
1988                );
1989            })
1990            .await;
1991    }
1992
1993    #[tokio::test(flavor = "current_thread")]
1994    async fn handle_inbound_routes_sampling_to_dispatcher() {
1995        // Confirms `sampling/createMessage` is routed to
1996        // `mcp_sampling::dispatch_inbound_sampling` rather than the
1997        // generic rejection path. With no host bridge installed, the
1998        // dispatcher declines with the structured `mcp.samplingDeclined`
1999        // error envelope — proving the request reached the right
2000        // handler instead of being bounced as `Method not found`.
2001        let request = serde_json::json!({
2002            "jsonrpc": "2.0",
2003            "id": 42,
2004            "method": crate::mcp_sampling::SAMPLING_METHOD,
2005            "params": {
2006                "messages": [
2007                    {"role": "user", "content": {"type": "text", "text": "ping"}}
2008                ],
2009                "maxTokens": 4,
2010            },
2011        });
2012        let response = handle_inbound_client_request("mock", &request)
2013            .await
2014            .expect("sampling should produce a response");
2015        assert_eq!(response["id"], serde_json::json!(42));
2016        assert_eq!(
2017            response["error"]["data"]["type"],
2018            serde_json::json!("mcp.samplingDeclined")
2019        );
2020    }
2021}