Skip to main content

harn_vm/
mcp.rs

1//! MCP (Model Context Protocol) client for connecting to external tool servers.
2//!
3//! Supports stdio transport and streamable HTTP-style request/response transport.
4
5use std::collections::BTreeMap;
6use std::path::{Path, PathBuf};
7use std::rc::Rc;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use reqwest_eventsource::{Event as SseEvent, EventSource};
12use serde::Deserialize;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::{Child, ChildStdin, ChildStdout};
15use tokio::sync::Mutex;
16
17use crate::stdlib::json_to_vm_value;
18use crate::value::{VmError, VmValue};
19use crate::vm::Vm;
20
21/// MCP protocol version we negotiate by default.
22const PROTOCOL_VERSION: &str = "2025-11-25";
23
24/// Default timeout for MCP requests (60 seconds).
25const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
26
27#[derive(Clone, Debug, Deserialize)]
28#[serde(rename_all = "lowercase")]
29enum McpTransport {
30    Stdio,
31    Http,
32}
33
34#[derive(Clone, Debug, Deserialize)]
35pub struct McpServerSpec {
36    pub name: String,
37    #[serde(default = "default_transport")]
38    transport: McpTransport,
39    #[serde(default)]
40    pub command: String,
41    #[serde(default)]
42    pub args: Vec<String>,
43    #[serde(default)]
44    pub env: BTreeMap<String, String>,
45    #[serde(default)]
46    pub url: String,
47    #[serde(default)]
48    pub auth_token: Option<String>,
49    #[serde(default)]
50    pub protocol_version: Option<String>,
51    #[serde(default)]
52    pub proxy_server_name: Option<String>,
53}
54
55fn default_transport() -> McpTransport {
56    McpTransport::Stdio
57}
58
59/// Internal state for an MCP client connection.
60enum McpClientInner {
61    Stdio(StdioMcpClientInner),
62    Http(HttpMcpClientInner),
63}
64
65struct StdioMcpClientInner {
66    child: Child,
67    stdin: ChildStdin,
68    reader: BufReader<ChildStdout>,
69    next_id: u64,
70}
71
72struct HttpMcpClientInner {
73    client: reqwest::Client,
74    url: String,
75    auth_token: Option<String>,
76    protocol_version: String,
77    session_id: Option<String>,
78    next_id: u64,
79    proxy_server_name: Option<String>,
80    get_stream_task: Option<tokio::task::JoinHandle<()>>,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct McpRoot {
85    path: String,
86    uri: String,
87    name: String,
88}
89
90impl McpRoot {
91    fn protocol_json(&self) -> serde_json::Value {
92        serde_json::json!({
93            "uri": self.uri,
94            "name": self.name,
95        })
96    }
97
98    fn script_json(&self) -> serde_json::Value {
99        serde_json::json!({
100            "uri": self.uri,
101            "name": self.name,
102            "path": self.path,
103        })
104    }
105}
106
107impl HttpMcpClientInner {
108    fn abort_get_stream(&mut self) {
109        if let Some(task) = self.get_stream_task.take() {
110            task.abort();
111        }
112    }
113}
114
115impl Drop for StdioMcpClientInner {
116    fn drop(&mut self) {
117        let _ = self.child.start_kill();
118    }
119}
120
121impl Drop for HttpMcpClientInner {
122    fn drop(&mut self) {
123        self.abort_get_stream();
124    }
125}
126
127/// Handle to an MCP client connection, stored in VmValue.
128#[derive(Clone)]
129pub struct VmMcpClientHandle {
130    pub name: String,
131    inner: Arc<Mutex<Option<McpClientInner>>>,
132    last_roots: Arc<Mutex<Vec<McpRoot>>>,
133    pub(crate) initialize_result: Arc<Mutex<Option<serde_json::Value>>>,
134}
135
136impl std::fmt::Debug for VmMcpClientHandle {
137    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138        write!(f, "McpClient({})", self.name)
139    }
140}
141
142impl VmMcpClientHandle {
143    pub(crate) async fn call(
144        &self,
145        method: &str,
146        params: serde_json::Value,
147    ) -> Result<serde_json::Value, VmError> {
148        if method != "initialize" {
149            self.notify_roots_list_changed_if_needed().await?;
150        }
151        let mut guard = self.inner.lock().await;
152        let inner = guard
153            .as_mut()
154            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
155
156        match inner {
157            McpClientInner::Stdio(inner) => stdio_call(inner, &self.name, method, params).await,
158            McpClientInner::Http(inner) => http_call(inner, &self.name, method, params).await,
159        }
160    }
161
162    async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
163        let mut guard = self.inner.lock().await;
164        let inner = guard
165            .as_mut()
166            .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
167
168        match inner {
169            McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
170            McpClientInner::Http(inner) => http_notify(inner, &self.name, method, params).await,
171        }
172    }
173
174    pub(crate) async fn disconnect(&self) -> Result<(), VmError> {
175        let mut guard = self.inner.lock().await;
176        if let Some(inner) = guard.take() {
177            match inner {
178                McpClientInner::Stdio(mut inner) => {
179                    let _ = inner.child.kill().await;
180                }
181                McpClientInner::Http(mut inner) => {
182                    inner.abort_get_stream();
183                }
184            }
185        }
186        Ok(())
187    }
188
189    async fn notify_roots_list_changed_if_needed(&self) -> Result<(), VmError> {
190        let roots = current_mcp_roots();
191        let mut last_roots = self.last_roots.lock().await;
192        if *last_roots == roots {
193            return Ok(());
194        }
195
196        self.notify(
197            crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION,
198            serde_json::json!({}),
199        )
200        .await?;
201        *last_roots = roots;
202        Ok(())
203    }
204}
205
206async fn stdio_call(
207    inner: &mut StdioMcpClientInner,
208    server_name: &str,
209    method: &str,
210    params: serde_json::Value,
211) -> Result<serde_json::Value, VmError> {
212    let id = inner.next_id;
213    inner.next_id += 1;
214
215    let request = serde_json::json!({
216        "jsonrpc": "2.0",
217        "id": id,
218        "method": method,
219        "params": params,
220    });
221
222    let line = serde_json::to_string(&request)
223        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
224    inner
225        .stdin
226        .write_all(line.as_bytes())
227        .await
228        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
229    inner
230        .stdin
231        .write_all(b"\n")
232        .await
233        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
234    inner
235        .stdin
236        .flush()
237        .await
238        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
239
240    let mut line_buf = String::new();
241    loop {
242        line_buf.clear();
243        let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
244            .await
245            .map_err(|_| {
246                VmError::Runtime(format!(
247                    "MCP: server did not respond to '{method}' within {}s",
248                    MCP_TIMEOUT.as_secs()
249                ))
250            })?
251            .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
252
253        if bytes_read == 0 {
254            return Err(VmError::Runtime("MCP: server closed connection".into()));
255        }
256
257        let trimmed = line_buf.trim();
258        if trimmed.is_empty() {
259            continue;
260        }
261
262        let msg: serde_json::Value = match serde_json::from_str(trimmed) {
263            Ok(v) => v,
264            Err(_) => continue,
265        };
266
267        if msg.get("id").is_none() {
268            continue;
269        }
270
271        if msg["id"].as_u64() == Some(id)
272            && (msg.get("result").is_some() || msg.get("error").is_some())
273        {
274            return parse_jsonrpc_result(msg);
275        }
276
277        let response = match handle_inbound_client_request(server_name, &msg).await {
278            Some(response) => response,
279            None => continue,
280        };
281        let line = serde_json::to_string(&response)
282            .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
283        inner
284            .stdin
285            .write_all(line.as_bytes())
286            .await
287            .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
288        inner
289            .stdin
290            .write_all(b"\n")
291            .await
292            .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
293        inner
294            .stdin
295            .flush()
296            .await
297            .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
298    }
299}
300
301/// Handle a server-to-client request that arrived on the stream while
302/// we were waiting for a response. Returns `Some(response)` to send
303/// back to the server, or `None` if the message wasn't actually a
304/// request (e.g. an unknown notification we should ignore).
305async fn handle_inbound_client_request(
306    server_name: &str,
307    msg: &serde_json::Value,
308) -> Option<serde_json::Value> {
309    let method = msg.get("method").and_then(|value| value.as_str())?;
310    if method == crate::mcp_elicit::ELICITATION_METHOD {
311        return Some(crate::mcp_elicit::dispatch_inbound_elicitation(server_name, msg).await);
312    }
313    if method == crate::mcp_sampling::SAMPLING_METHOD {
314        return Some(crate::mcp_sampling::dispatch_inbound_sampling(server_name, msg).await);
315    }
316    if method == crate::mcp_protocol::METHOD_ROOTS_LIST {
317        let id = msg.get("id")?.clone();
318        return Some(harn_roots_list_response(id));
319    }
320    client_request_rejection(msg)
321}
322
323async fn stdio_notify(
324    inner: &mut StdioMcpClientInner,
325    method: &str,
326    params: serde_json::Value,
327) -> Result<(), VmError> {
328    let notification = serde_json::json!({
329        "jsonrpc": "2.0",
330        "method": method,
331        "params": params,
332    });
333
334    let line = serde_json::to_string(&notification)
335        .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
336    inner
337        .stdin
338        .write_all(line.as_bytes())
339        .await
340        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
341    inner
342        .stdin
343        .write_all(b"\n")
344        .await
345        .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
346    inner
347        .stdin
348        .flush()
349        .await
350        .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
351    Ok(())
352}
353
354async fn http_call(
355    inner: &mut HttpMcpClientInner,
356    server_name: &str,
357    method: &str,
358    params: serde_json::Value,
359) -> Result<serde_json::Value, VmError> {
360    let id = inner.next_id;
361    inner.next_id += 1;
362    send_http_request(inner, server_name, method, params, Some(id)).await
363}
364
365async fn http_notify(
366    inner: &mut HttpMcpClientInner,
367    server_name: &str,
368    method: &str,
369    params: serde_json::Value,
370) -> Result<(), VmError> {
371    let _ = send_http_request(inner, server_name, method, params, None).await?;
372    Ok(())
373}
374
375async fn send_http_request(
376    inner: &mut HttpMcpClientInner,
377    server_name: &str,
378    method: &str,
379    params: serde_json::Value,
380    id: Option<u64>,
381) -> Result<serde_json::Value, VmError> {
382    for attempt in 0..2 {
383        let response = send_http_request_once(inner, method, params.clone(), id).await?;
384
385        let status = response.status().as_u16();
386        let headers = response.headers().clone();
387        if let Some(protocol_version) = headers
388            .get("MCP-Protocol-Version")
389            .and_then(|v| v.to_str().ok())
390        {
391            inner.protocol_version = protocol_version.to_string();
392        }
393        if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
394            inner.session_id = Some(session_id.to_string());
395        }
396
397        if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
398            inner.session_id = None;
399            inner.abort_get_stream();
400            reinitialize_http_client(inner).await?;
401            continue;
402        }
403
404        if status == 401 {
405            return Err(VmError::Thrown(VmValue::String(Rc::from(
406                "MCP authorization required",
407            ))));
408        }
409
410        let body = response
411            .text()
412            .await
413            .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
414
415        if body.trim().is_empty() {
416            if status < 400 {
417                ensure_http_get_stream(inner, server_name);
418            }
419            return Ok(serde_json::Value::Null);
420        }
421
422        let msg = parse_http_response_body(inner, server_name, &body, status, id).await?;
423
424        if status >= 400 {
425            return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
426        }
427
428        ensure_http_get_stream(inner, server_name);
429        if id.is_none() {
430            return Ok(msg);
431        }
432        return parse_jsonrpc_result(msg);
433    }
434
435    Err(VmError::Runtime("MCP HTTP request failed".into()))
436}
437
438async fn send_http_request_once(
439    inner: &mut HttpMcpClientInner,
440    method: &str,
441    params: serde_json::Value,
442    id: Option<u64>,
443) -> Result<reqwest::Response, VmError> {
444    let mut payload = serde_json::json!({
445        "jsonrpc": "2.0",
446        "method": method,
447        "params": params,
448    });
449    if let Some(id) = id {
450        payload["id"] = serde_json::json!(id);
451    }
452    let payload = wrap_http_payload(payload, inner.proxy_server_name.as_deref());
453
454    let request = inner
455        .client
456        .post(&inner.url)
457        .header("Content-Type", "application/json")
458        .header("Accept", "application/json, text/event-stream")
459        .json(&payload);
460    let request = apply_http_headers(
461        request,
462        &inner.auth_token,
463        &inner.protocol_version,
464        inner.session_id.as_deref(),
465    );
466
467    request
468        .timeout(MCP_TIMEOUT)
469        .send()
470        .await
471        .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
472}
473
474fn ensure_http_get_stream(inner: &mut HttpMcpClientInner, server_name: &str) {
475    if server_name.is_empty() {
476        return;
477    }
478    if inner
479        .get_stream_task
480        .as_ref()
481        .is_some_and(|task| !task.is_finished())
482    {
483        return;
484    }
485
486    let config = HttpStreamConfig {
487        client: inner.client.clone(),
488        url: inner.url.clone(),
489        auth_token: inner.auth_token.clone(),
490        protocol_version: inner.protocol_version.clone(),
491        session_id: inner.session_id.clone(),
492        proxy_server_name: inner.proxy_server_name.clone(),
493        server_name: server_name.to_string(),
494    };
495    inner.get_stream_task = Some(tokio::task::spawn_local(run_http_get_stream(config)));
496}
497
498#[derive(Clone)]
499struct HttpStreamConfig {
500    client: reqwest::Client,
501    url: String,
502    auth_token: Option<String>,
503    protocol_version: String,
504    session_id: Option<String>,
505    proxy_server_name: Option<String>,
506    server_name: String,
507}
508
509async fn run_http_get_stream(config: HttpStreamConfig) {
510    let request = apply_http_headers(
511        config
512            .client
513            .get(&config.url)
514            .header("Accept", "text/event-stream"),
515        &config.auth_token,
516        &config.protocol_version,
517        config.session_id.as_deref(),
518    );
519    let Ok(mut stream) = EventSource::new(request) else {
520        return;
521    };
522
523    while let Some(event) = stream.next().await {
524        match event {
525            Ok(SseEvent::Open) => {}
526            Ok(SseEvent::Message(message)) => {
527                if message.data.trim().is_empty() {
528                    continue;
529                }
530                let Ok(msg) = serde_json::from_str::<serde_json::Value>(&message.data) else {
531                    tracing::debug!("MCP HTTP GET stream received non-JSON event");
532                    continue;
533                };
534                if let Some(response) =
535                    handle_inbound_client_request(&config.server_name, &msg).await
536                {
537                    let _ = post_http_jsonrpc_payload(&config, response).await;
538                }
539            }
540            Err(error) => {
541                tracing::debug!("MCP HTTP GET stream ended with error: {error}");
542                break;
543            }
544        }
545    }
546    stream.close();
547}
548
549async fn post_http_jsonrpc_payload(
550    config: &HttpStreamConfig,
551    payload: serde_json::Value,
552) -> Result<(), VmError> {
553    let payload = wrap_http_payload(payload, config.proxy_server_name.as_deref());
554    let request = config
555        .client
556        .post(&config.url)
557        .header("Content-Type", "application/json")
558        .header("Accept", "application/json, text/event-stream")
559        .json(&payload)
560        .timeout(MCP_TIMEOUT);
561    let request = apply_http_headers(
562        request,
563        &config.auth_token,
564        &config.protocol_version,
565        config.session_id.as_deref(),
566    );
567    let response = request
568        .send()
569        .await
570        .map_err(|e| VmError::Runtime(format!("MCP HTTP response POST error: {e}")))?;
571    if response.status().is_success() {
572        Ok(())
573    } else {
574        Err(VmError::Runtime(format!(
575            "MCP HTTP response POST returned {}",
576            response.status()
577        )))
578    }
579}
580
581fn apply_http_headers(
582    mut request: reqwest::RequestBuilder,
583    auth_token: &Option<String>,
584    protocol_version: &str,
585    session_id: Option<&str>,
586) -> reqwest::RequestBuilder {
587    request = request.header("MCP-Protocol-Version", protocol_version);
588    if let Some(token) = auth_token {
589        request = request.header("Authorization", format!("Bearer {token}"));
590    }
591    if let Some(session_id) = session_id {
592        request = request.header("MCP-Session-Id", session_id);
593    }
594    request
595}
596
597fn wrap_http_payload(
598    payload: serde_json::Value,
599    proxy_server_name: Option<&str>,
600) -> serde_json::Value {
601    let Some(proxy_server_name) = proxy_server_name else {
602        return payload;
603    };
604    let mut wrapped = serde_json::Map::new();
605    wrapped.insert(
606        "serverName".to_string(),
607        serde_json::Value::String(proxy_server_name.to_string()),
608    );
609    if let Some(object) = payload.as_object() {
610        for (key, value) in object {
611            wrapped.insert(key.clone(), value.clone());
612        }
613    }
614    serde_json::Value::Object(wrapped)
615}
616
617async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
618    let initialize = send_http_request_once(
619        inner,
620        "initialize",
621        serde_json::json!({
622            "protocolVersion": PROTOCOL_VERSION,
623            "capabilities": {
624                "elicitation": {},
625                "roots": {
626                    "listChanged": true,
627                },
628                "sampling": {},
629            },
630            "clientInfo": {
631                "name": "harn",
632                "version": env!("CARGO_PKG_VERSION"),
633            }
634        }),
635        Some(0),
636    )
637    .await?;
638    if let Some(protocol_version) = initialize
639        .headers()
640        .get("MCP-Protocol-Version")
641        .and_then(|v| v.to_str().ok())
642    {
643        inner.protocol_version = protocol_version.to_string();
644    }
645    if let Some(session_id) = initialize
646        .headers()
647        .get("MCP-Session-Id")
648        .and_then(|v| v.to_str().ok())
649    {
650        inner.session_id = Some(session_id.to_string());
651    }
652    let status = initialize.status().as_u16();
653    let body = initialize
654        .text()
655        .await
656        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
657    let msg = parse_http_response_body(inner, "", &body, status, Some(0)).await?;
658    if status >= 400 {
659        return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
660    }
661    let _ = parse_jsonrpc_result(msg)?;
662    let response = send_http_request_once(
663        inner,
664        "notifications/initialized",
665        serde_json::json!({}),
666        None,
667    )
668    .await?;
669    let status = response.status().as_u16();
670    if let Some(protocol_version) = response
671        .headers()
672        .get("MCP-Protocol-Version")
673        .and_then(|v| v.to_str().ok())
674    {
675        inner.protocol_version = protocol_version.to_string();
676    }
677    if let Some(session_id) = response
678        .headers()
679        .get("MCP-Session-Id")
680        .and_then(|v| v.to_str().ok())
681    {
682        inner.session_id = Some(session_id.to_string());
683    }
684    let body = response
685        .text()
686        .await
687        .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
688    if body.trim().is_empty() || status < 400 {
689        return Ok(());
690    }
691    let msg = parse_http_response_body(inner, "", &body, status, None).await?;
692    Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
693}
694
695async fn parse_http_response_body(
696    inner: &HttpMcpClientInner,
697    server_name: &str,
698    body: &str,
699    status: u16,
700    request_id: Option<u64>,
701) -> Result<serde_json::Value, VmError> {
702    if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
703        return parse_sse_jsonrpc_body(inner, server_name, body, request_id).await;
704    }
705    serde_json::from_str(body).map_err(|e| {
706        VmError::Runtime(format!(
707            "MCP HTTP response parse error (status {status}): {e}"
708        ))
709    })
710}
711
712async fn parse_sse_jsonrpc_body(
713    inner: &HttpMcpClientInner,
714    server_name: &str,
715    body: &str,
716    request_id: Option<u64>,
717) -> Result<serde_json::Value, VmError> {
718    let mut current_data = Vec::new();
719    let mut messages = Vec::new();
720
721    for line in body.lines() {
722        if line.is_empty() {
723            if !current_data.is_empty() {
724                messages.push(current_data.join("\n"));
725                current_data.clear();
726            }
727            continue;
728        }
729        if let Some(data) = line.strip_prefix("data:") {
730            current_data.push(data.trim_start().to_string());
731        }
732    }
733    if !current_data.is_empty() {
734        messages.push(current_data.join("\n"));
735    }
736
737    let config = HttpStreamConfig {
738        client: inner.client.clone(),
739        url: inner.url.clone(),
740        auth_token: inner.auth_token.clone(),
741        protocol_version: inner.protocol_version.clone(),
742        session_id: inner.session_id.clone(),
743        proxy_server_name: inner.proxy_server_name.clone(),
744        server_name: server_name.to_string(),
745    };
746
747    let mut fallback = None;
748    for message in messages {
749        if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
750            if request_id.is_some()
751                && value["id"].as_u64() == request_id
752                && (value.get("result").is_some() || value.get("error").is_some())
753            {
754                return Ok(value);
755            }
756            if let Some(response) = handle_inbound_client_request(server_name, &value).await {
757                let _ = post_http_jsonrpc_payload(&config, response).await;
758                continue;
759            }
760            if value.get("result").is_some() || value.get("error").is_some() {
761                fallback = Some(value);
762            }
763        }
764    }
765
766    fallback.ok_or_else(|| {
767        VmError::Runtime(
768            "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
769        )
770    })
771}
772
773fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
774    if let Some(error) = msg.get("error") {
775        return Err(jsonrpc_error_to_vm_error(error));
776    }
777    Ok(msg
778        .get("result")
779        .cloned()
780        .unwrap_or(serde_json::Value::Null))
781}
782
783fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
784    let message = error
785        .get("message")
786        .and_then(|v| v.as_str())
787        .unwrap_or("Unknown MCP error");
788    let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
789    VmError::Thrown(VmValue::String(Rc::from(format!(
790        "MCP error ({code}): {message}"
791    ))))
792}
793
794fn client_request_rejection(msg: &serde_json::Value) -> Option<serde_json::Value> {
795    let request_id = msg.get("id")?.clone();
796    let method = msg.get("method").and_then(|value| value.as_str())?;
797    crate::mcp_protocol::unsupported_latest_spec_method_response(request_id.clone(), method)
798        .or_else(|| {
799            Some(crate::jsonrpc::error_response(
800                request_id,
801                -32601,
802                &format!("Method not found: {method}"),
803            ))
804        })
805}
806
807fn harn_roots_list_response(id: serde_json::Value) -> serde_json::Value {
808    crate::jsonrpc::response(
809        id,
810        serde_json::json!({
811            "roots": current_mcp_roots()
812                .iter()
813                .map(McpRoot::protocol_json)
814                .collect::<Vec<_>>()
815        }),
816    )
817}
818
819pub(crate) fn current_mcp_roots() -> Vec<McpRoot> {
820    compact_root_paths(current_mcp_root_candidates())
821        .into_iter()
822        .filter_map(|path| {
823            let uri = url::Url::from_file_path(&path).ok()?.to_string();
824            Some(McpRoot {
825                name: root_display_name(&path),
826                path: path.to_string_lossy().into_owned(),
827                uri,
828            })
829        })
830        .collect()
831}
832
833fn current_mcp_root_candidates() -> Vec<PathBuf> {
834    let mut candidates = Vec::new();
835    if let Some(context) = crate::stdlib::process::current_execution_context() {
836        if let Some(path) = non_empty_path(context.worktree_path.as_deref()) {
837            candidates.push(path);
838        }
839        if let Some(cwd) = non_empty_path(context.cwd.as_deref()) {
840            push_project_root_or_path(&mut candidates, cwd);
841        }
842        if let Some(source_dir) = non_empty_path(context.source_dir.as_deref()) {
843            push_project_root_or_path(&mut candidates, source_dir);
844        }
845    } else {
846        push_project_root_or_path(
847            &mut candidates,
848            crate::stdlib::process::execution_root_path(),
849        );
850        push_project_root_or_path(&mut candidates, crate::stdlib::process::source_root_path());
851    }
852
853    if candidates.is_empty() {
854        candidates.push(crate::stdlib::process::execution_root_path());
855    }
856    candidates
857}
858
859fn non_empty_path(raw: Option<&str>) -> Option<PathBuf> {
860    raw.filter(|path| !path.trim().is_empty())
861        .map(PathBuf::from)
862}
863
864fn push_project_root_or_path(candidates: &mut Vec<PathBuf>, path: PathBuf) {
865    let normalized = crate::stdlib::process::normalize_context_path(&path);
866    match crate::stdlib::process::find_project_root(&normalized) {
867        Some(root) => candidates.push(root),
868        None => candidates.push(normalized),
869    }
870}
871
872fn compact_root_paths(paths: Vec<PathBuf>) -> Vec<PathBuf> {
873    let mut normalized = paths
874        .into_iter()
875        .map(normalize_root_path)
876        .collect::<Vec<_>>();
877    normalized.sort_by_key(|path| {
878        (
879            path.components().count(),
880            path.to_string_lossy().to_string(),
881        )
882    });
883
884    let mut roots: Vec<PathBuf> = Vec::new();
885    for path in normalized {
886        if roots
887            .iter()
888            .any(|existing| path == *existing || path.starts_with(existing))
889        {
890            continue;
891        }
892        roots.push(path);
893    }
894    roots
895}
896
897fn normalize_root_path(path: PathBuf) -> PathBuf {
898    let absolute = crate::stdlib::process::normalize_context_path(&path);
899    std::fs::canonicalize(&absolute).unwrap_or(absolute)
900}
901
902fn root_display_name(path: &Path) -> String {
903    path.file_name()
904        .and_then(|name| name.to_str())
905        .filter(|name| !name.is_empty())
906        .map(str::to_string)
907        .unwrap_or_else(|| path.display().to_string())
908}
909
910async fn mcp_connect_stdio_impl(
911    command: &str,
912    args: &[String],
913    env: &BTreeMap<String, String>,
914) -> Result<VmMcpClientHandle, VmError> {
915    let mut cmd = tokio::process::Command::new(command);
916    cmd.args(args)
917        .stdin(std::process::Stdio::piped())
918        .stdout(std::process::Stdio::piped())
919        .stderr(std::process::Stdio::inherit())
920        .envs(env);
921    cmd.kill_on_drop(true);
922
923    let mut child = cmd.spawn().map_err(|e| {
924        VmError::Thrown(VmValue::String(Rc::from(format!(
925            "mcp_connect: failed to spawn '{command}': {e}"
926        ))))
927    })?;
928
929    let stdin = child
930        .stdin
931        .take()
932        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
933    let stdout = child
934        .stdout
935        .take()
936        .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
937
938    let handle = VmMcpClientHandle {
939        name: command.to_string(),
940        inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
941            StdioMcpClientInner {
942                child,
943                stdin,
944                reader: BufReader::new(stdout),
945                next_id: 1,
946            },
947        )))),
948        last_roots: Arc::new(Mutex::new(Vec::new())),
949        initialize_result: Arc::new(Mutex::new(None)),
950    };
951
952    initialize_client(&handle).await?;
953    Ok(handle)
954}
955
956async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
957    let client = reqwest::Client::builder()
958        .build()
959        .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
960
961    let handle = VmMcpClientHandle {
962        name: spec.name.clone(),
963        inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
964            client,
965            url: spec.url.clone(),
966            auth_token: spec.auth_token.clone(),
967            protocol_version: spec
968                .protocol_version
969                .clone()
970                .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
971            session_id: None,
972            next_id: 1,
973            proxy_server_name: spec.proxy_server_name.clone(),
974            get_stream_task: None,
975        })))),
976        last_roots: Arc::new(Mutex::new(Vec::new())),
977        initialize_result: Arc::new(Mutex::new(None)),
978    };
979
980    initialize_client(&handle).await?;
981    Ok(handle)
982}
983
984async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
985    let initialize_result = handle
986        .call(
987            "initialize",
988            serde_json::json!({
989                "protocolVersion": PROTOCOL_VERSION,
990                "capabilities": {
991                    "elicitation": {},
992                    "roots": {
993                        "listChanged": true,
994                    },
995                    "sampling": {},
996                },
997                "clientInfo": {
998                    "name": "harn",
999                    "version": env!("CARGO_PKG_VERSION"),
1000                }
1001            }),
1002        )
1003        .await?;
1004    *handle.initialize_result.lock().await = Some(initialize_result);
1005
1006    handle
1007        .notify("notifications/initialized", serde_json::json!({}))
1008        .await?;
1009
1010    Ok(())
1011}
1012
1013pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
1014    match val {
1015        VmValue::String(s) => serde_json::Value::String(s.to_string()),
1016        VmValue::Int(n) => serde_json::json!(*n),
1017        VmValue::Float(n) => serde_json::json!(*n),
1018        VmValue::Bool(b) => serde_json::Value::Bool(*b),
1019        VmValue::Nil => serde_json::Value::Null,
1020        VmValue::List(items) => {
1021            serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
1022        }
1023        VmValue::Dict(map) => {
1024            let obj: serde_json::Map<String, serde_json::Value> = map
1025                .iter()
1026                .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1027                .collect();
1028            serde_json::Value::Object(obj)
1029        }
1030        _ => serde_json::Value::Null,
1031    }
1032}
1033
1034fn extract_content_text(result: &serde_json::Value) -> String {
1035    if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
1036        let texts: Vec<&str> = content
1037            .iter()
1038            .filter_map(|item| {
1039                if item.get("type").and_then(|t| t.as_str()) == Some("text") {
1040                    item.get("text").and_then(|t| t.as_str())
1041                } else {
1042                    None
1043                }
1044            })
1045            .collect();
1046        if texts.is_empty() {
1047            json_to_vm_value(result).display()
1048        } else {
1049            texts.join("\n")
1050        }
1051    } else {
1052        json_to_vm_value(result).display()
1053    }
1054}
1055
1056pub(crate) async fn call_mcp_tool(
1057    client: &VmMcpClientHandle,
1058    tool_name: &str,
1059    arguments: serde_json::Value,
1060) -> Result<serde_json::Value, VmError> {
1061    let result = client
1062        .call(
1063            "tools/call",
1064            serde_json::json!({
1065                "name": tool_name,
1066                "arguments": arguments,
1067            }),
1068        )
1069        .await?;
1070
1071    if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
1072        let error_text = extract_content_text(&result);
1073        return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
1074    }
1075
1076    let content = result
1077        .get("content")
1078        .and_then(|c| c.as_array())
1079        .cloned()
1080        .unwrap_or_default();
1081
1082    if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
1083        if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
1084            return Ok(serde_json::Value::String(text.to_string()));
1085        }
1086    }
1087
1088    if content.is_empty() {
1089        Ok(serde_json::Value::Null)
1090    } else {
1091        Ok(serde_json::Value::Array(content))
1092    }
1093}
1094
1095pub async fn connect_mcp_server(
1096    name: &str,
1097    command: &str,
1098    args: &[String],
1099) -> Result<VmMcpClientHandle, VmError> {
1100    let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
1101    handle.name = name.to_string();
1102    Ok(handle)
1103}
1104
1105pub async fn connect_mcp_server_from_spec(
1106    spec: &McpServerSpec,
1107) -> Result<VmMcpClientHandle, VmError> {
1108    let mut handle = match spec.transport {
1109        McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
1110        McpTransport::Http => mcp_connect_http_impl(spec).await?,
1111    };
1112    handle.name = spec.name.clone();
1113    Ok(handle)
1114}
1115
1116pub async fn connect_mcp_server_from_json(
1117    value: &serde_json::Value,
1118) -> Result<VmMcpClientHandle, VmError> {
1119    let spec: McpServerSpec = serde_json::from_value(value.clone())
1120        .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
1121    connect_mcp_server_from_spec(&spec).await
1122}
1123
1124pub fn register_mcp_builtins(vm: &mut Vm) {
1125    vm.register_builtin("mcp_roots", mcp_roots_builtin);
1126    vm.register_builtin("harn.mcp.roots", mcp_roots_builtin);
1127    register_harn_mcp_namespace(vm);
1128
1129    vm.register_async_builtin("mcp_connect", |args| async move {
1130        let command = args.first().map(|a| a.display()).unwrap_or_default();
1131        if command.is_empty() {
1132            return Err(VmError::Thrown(VmValue::String(Rc::from(
1133                "mcp_connect: command is required",
1134            ))));
1135        }
1136
1137        let cmd_args: Vec<String> = match args.get(1) {
1138            Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
1139            _ => Vec::new(),
1140        };
1141
1142        let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
1143        Ok(VmValue::McpClient(handle))
1144    });
1145
1146    // Lazy registry: ensure a registered server is booted and return its
1147    // live client handle. Used by skill activation (`requires_mcp`) and
1148    // by user code that wants to trigger a lazy connect explicitly.
1149    vm.register_async_builtin("mcp_ensure_active", |args| async move {
1150        let name = match args.first() {
1151            Some(VmValue::String(s)) => s.to_string(),
1152            Some(other) => other.display(),
1153            None => String::new(),
1154        };
1155        if name.is_empty() {
1156            return Err(VmError::Thrown(VmValue::String(Rc::from(
1157                "mcp_ensure_active: server name is required",
1158            ))));
1159        }
1160        let handle = crate::mcp_registry::ensure_active(&name).await?;
1161        Ok(VmValue::McpClient(handle))
1162    });
1163
1164    // Decrement the binder refcount for a registered server. Called by
1165    // skill deactivation paths and by user code that manually bound via
1166    // `mcp_ensure_active`. No-op when the name isn't registered.
1167    vm.register_builtin("mcp_release", |args, _out| {
1168        let name = match args.first() {
1169            Some(VmValue::String(s)) => s.to_string(),
1170            Some(other) => other.display(),
1171            None => {
1172                return Err(VmError::Thrown(VmValue::String(Rc::from(
1173                    "mcp_release: server name is required",
1174                ))));
1175            }
1176        };
1177        crate::mcp_registry::release(&name);
1178        Ok(VmValue::Nil)
1179    });
1180
1181    // Return the declared MCP servers and their current state as a list
1182    // of dicts. Purely diagnostic — useful for `harn` scripts that want
1183    // to show connection state in a status-line or dashboard.
1184    vm.register_builtin("mcp_registry_status", |_args, _out| {
1185        let mut out = Vec::new();
1186        for entry in crate::mcp_registry::snapshot_status() {
1187            let mut dict = BTreeMap::new();
1188            dict.insert(
1189                "name".to_string(),
1190                VmValue::String(Rc::from(entry.name.as_str())),
1191            );
1192            dict.insert("lazy".to_string(), VmValue::Bool(entry.lazy));
1193            dict.insert("active".to_string(), VmValue::Bool(entry.active));
1194            dict.insert(
1195                "ref_count".to_string(),
1196                VmValue::Int(entry.ref_count as i64),
1197            );
1198            if let Some(card) = entry.card {
1199                dict.insert("card".to_string(), VmValue::String(Rc::from(card.as_str())));
1200            }
1201            out.push(VmValue::Dict(Rc::new(dict)));
1202        }
1203        Ok(VmValue::List(Rc::new(out)))
1204    });
1205
1206    // Fetch (or read from cache) the Server Card for a registered MCP
1207    // server, or from an explicit URL / local path.
1208    //
1209    // `mcp_server_card("notion")`           -> looks up `card = ...` in harn.toml
1210    // `mcp_server_card("https://.../card")` -> fetches that URL directly
1211    // `mcp_server_card("./card.json")`      -> reads that file directly
1212    vm.register_async_builtin("mcp_server_card", |args| async move {
1213        let target = match args.first() {
1214            Some(VmValue::String(s)) => s.to_string(),
1215            Some(other) => other.display(),
1216            None => {
1217                return Err(VmError::Thrown(VmValue::String(Rc::from(
1218                    "mcp_server_card: server name, URL, or path is required",
1219                ))));
1220            }
1221        };
1222
1223        // Source resolution: if the arg looks like a URL or path
1224        // (contains '/', '\\', or starts with a scheme), use it as-is.
1225        // Otherwise treat it as a registered server name and look up
1226        // its `card` field. This matches the user model: "I already
1227        // wrote down where the card lives in harn.toml — just use it."
1228        let source = if target.starts_with("http://")
1229            || target.starts_with("https://")
1230            || target.contains('/')
1231            || target.contains('\\')
1232            || target.ends_with(".json")
1233        {
1234            target.clone()
1235        } else {
1236            match crate::mcp_registry::get_registration(&target) {
1237                Some(reg) => match reg.card {
1238                    Some(card) => card,
1239                    None => {
1240                        return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1241                            "mcp_server_card: server '{target}' has no 'card' field in harn.toml"
1242                        )))));
1243                    }
1244                },
1245                None => {
1246                    return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1247                        "mcp_server_card: no MCP server '{target}' registered (check harn.toml) \
1248                         — pass a URL or path directly instead"
1249                    )))));
1250                }
1251            }
1252        };
1253
1254        let card = crate::mcp_card::fetch_server_card(&source, None)
1255            .await
1256            .map_err(|e| {
1257                VmError::Thrown(VmValue::String(Rc::from(format!("mcp_server_card: {e}"))))
1258            })?;
1259        Ok(json_to_vm_value(&card))
1260    });
1261
1262    vm.register_async_builtin("mcp_list_tools", |args| async move {
1263        let client = match args.first() {
1264            Some(VmValue::McpClient(c)) => c.clone(),
1265            _ => {
1266                return Err(VmError::Thrown(VmValue::String(Rc::from(
1267                    "mcp_list_tools: argument must be an MCP client",
1268                ))));
1269            }
1270        };
1271
1272        let result = client.call("tools/list", serde_json::json!({})).await?;
1273        let mut tools = result
1274            .get("tools")
1275            .and_then(|t| t.as_array())
1276            .cloned()
1277            .unwrap_or_default();
1278
1279        // Tag every tool with its originating server name so
1280        // downstream indexers (tool_search BM25) can surface them
1281        // under queries like "github" or "mcp:github". Harmless to
1282        // non-indexing callers — just an extra dict key.
1283        let server_name = client.name.clone();
1284        for tool in tools.iter_mut() {
1285            if let Some(obj) = tool.as_object_mut() {
1286                obj.entry("_mcp_server")
1287                    .or_insert_with(|| serde_json::Value::String(server_name.clone()));
1288            }
1289        }
1290
1291        let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
1292        Ok(VmValue::List(Rc::new(vm_tools)))
1293    });
1294
1295    vm.register_async_builtin("mcp_call", |args| async move {
1296        let client = match args.first() {
1297            Some(VmValue::McpClient(c)) => c.clone(),
1298            _ => {
1299                return Err(VmError::Thrown(VmValue::String(Rc::from(
1300                    "mcp_call: first argument must be an MCP client",
1301                ))));
1302            }
1303        };
1304
1305        let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
1306        if tool_name.is_empty() {
1307            return Err(VmError::Thrown(VmValue::String(Rc::from(
1308                "mcp_call: tool name is required",
1309            ))));
1310        }
1311
1312        let arguments = match args.get(2) {
1313            Some(VmValue::Dict(d)) => {
1314                let obj: serde_json::Map<String, serde_json::Value> = d
1315                    .iter()
1316                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1317                    .collect();
1318                serde_json::Value::Object(obj)
1319            }
1320            _ => serde_json::json!({}),
1321        };
1322
1323        Ok(json_to_vm_value(
1324            &call_mcp_tool(&client, &tool_name, arguments).await?,
1325        ))
1326    });
1327
1328    vm.register_async_builtin("mcp_server_info", |args| async move {
1329        let client = match args.first() {
1330            Some(VmValue::McpClient(c)) => c.clone(),
1331            _ => {
1332                return Err(VmError::Thrown(VmValue::String(Rc::from(
1333                    "mcp_server_info: argument must be an MCP client",
1334                ))));
1335            }
1336        };
1337
1338        let guard = client.inner.lock().await;
1339        if guard.is_none() {
1340            return Err(VmError::Runtime("MCP client is disconnected".into()));
1341        }
1342        drop(guard);
1343
1344        let mut info = BTreeMap::new();
1345        info.insert(
1346            "name".to_string(),
1347            VmValue::String(Rc::from(client.name.as_str())),
1348        );
1349        info.insert("connected".to_string(), VmValue::Bool(true));
1350        let initialize = client
1351            .initialize_result
1352            .lock()
1353            .await
1354            .clone()
1355            .unwrap_or(serde_json::Value::Null);
1356        if !initialize.is_null() {
1357            if let Some(instructions) = initialize
1358                .get("instructions")
1359                .or_else(|| {
1360                    initialize
1361                        .get("serverInfo")
1362                        .and_then(|value| value.get("instructions"))
1363                })
1364                .and_then(|value| value.as_str())
1365                .filter(|value| !value.is_empty())
1366            {
1367                info.insert(
1368                    "instructions".to_string(),
1369                    VmValue::String(Rc::from(instructions)),
1370                );
1371            }
1372            info.insert("initialize".to_string(), json_to_vm_value(&initialize));
1373        }
1374        Ok(VmValue::Dict(Rc::new(info)))
1375    });
1376
1377    vm.register_async_builtin("mcp_disconnect", |args| async move {
1378        let client = match args.first() {
1379            Some(VmValue::McpClient(c)) => c.clone(),
1380            _ => {
1381                return Err(VmError::Thrown(VmValue::String(Rc::from(
1382                    "mcp_disconnect: argument must be an MCP client",
1383                ))));
1384            }
1385        };
1386
1387        client.disconnect().await?;
1388        Ok(VmValue::Nil)
1389    });
1390
1391    vm.register_async_builtin("mcp_list_resources", |args| async move {
1392        let client = match args.first() {
1393            Some(VmValue::McpClient(c)) => c.clone(),
1394            _ => {
1395                return Err(VmError::Thrown(VmValue::String(Rc::from(
1396                    "mcp_list_resources: argument must be an MCP client",
1397                ))));
1398            }
1399        };
1400
1401        let result = client.call("resources/list", serde_json::json!({})).await?;
1402        let resources = result
1403            .get("resources")
1404            .and_then(|r| r.as_array())
1405            .cloned()
1406            .unwrap_or_default();
1407
1408        let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
1409        Ok(VmValue::List(Rc::new(vm_resources)))
1410    });
1411
1412    vm.register_async_builtin("mcp_read_resource", |args| async move {
1413        let client = match args.first() {
1414            Some(VmValue::McpClient(c)) => c.clone(),
1415            _ => {
1416                return Err(VmError::Thrown(VmValue::String(Rc::from(
1417                    "mcp_read_resource: first argument must be an MCP client",
1418                ))));
1419            }
1420        };
1421
1422        let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
1423        if uri.is_empty() {
1424            return Err(VmError::Thrown(VmValue::String(Rc::from(
1425                "mcp_read_resource: URI is required",
1426            ))));
1427        }
1428
1429        let result = client
1430            .call("resources/read", serde_json::json!({ "uri": uri }))
1431            .await?;
1432
1433        let contents = result
1434            .get("contents")
1435            .and_then(|c| c.as_array())
1436            .cloned()
1437            .unwrap_or_default();
1438
1439        if contents.len() == 1 {
1440            if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
1441                return Ok(VmValue::String(Rc::from(text)));
1442            }
1443        }
1444
1445        if contents.is_empty() {
1446            Ok(VmValue::Nil)
1447        } else {
1448            Ok(VmValue::List(Rc::new(
1449                contents.iter().map(json_to_vm_value).collect(),
1450            )))
1451        }
1452    });
1453
1454    vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
1455        let client = match args.first() {
1456            Some(VmValue::McpClient(c)) => c.clone(),
1457            _ => {
1458                return Err(VmError::Thrown(VmValue::String(Rc::from(
1459                    "mcp_list_resource_templates: argument must be an MCP client",
1460                ))));
1461            }
1462        };
1463
1464        let result = client
1465            .call("resources/templates/list", serde_json::json!({}))
1466            .await?;
1467
1468        let templates = result
1469            .get("resourceTemplates")
1470            .and_then(|r| r.as_array())
1471            .cloned()
1472            .unwrap_or_default();
1473
1474        let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
1475        Ok(VmValue::List(Rc::new(vm_templates)))
1476    });
1477
1478    vm.register_async_builtin("mcp_list_prompts", |args| async move {
1479        let client = match args.first() {
1480            Some(VmValue::McpClient(c)) => c.clone(),
1481            _ => {
1482                return Err(VmError::Thrown(VmValue::String(Rc::from(
1483                    "mcp_list_prompts: argument must be an MCP client",
1484                ))));
1485            }
1486        };
1487
1488        let result = client.call("prompts/list", serde_json::json!({})).await?;
1489
1490        let prompts = result
1491            .get("prompts")
1492            .and_then(|p| p.as_array())
1493            .cloned()
1494            .unwrap_or_default();
1495
1496        let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
1497        Ok(VmValue::List(Rc::new(vm_prompts)))
1498    });
1499
1500    vm.register_async_builtin("mcp_get_prompt", |args| async move {
1501        let client = match args.first() {
1502            Some(VmValue::McpClient(c)) => c.clone(),
1503            _ => {
1504                return Err(VmError::Thrown(VmValue::String(Rc::from(
1505                    "mcp_get_prompt: first argument must be an MCP client",
1506                ))));
1507            }
1508        };
1509
1510        let name = args.get(1).map(|a| a.display()).unwrap_or_default();
1511        if name.is_empty() {
1512            return Err(VmError::Thrown(VmValue::String(Rc::from(
1513                "mcp_get_prompt: prompt name is required",
1514            ))));
1515        }
1516
1517        let arguments = match args.get(2) {
1518            Some(VmValue::Dict(d)) => {
1519                let obj: serde_json::Map<String, serde_json::Value> = d
1520                    .iter()
1521                    .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1522                    .collect();
1523                serde_json::Value::Object(obj)
1524            }
1525            _ => serde_json::json!({}),
1526        };
1527
1528        let result = client
1529            .call(
1530                "prompts/get",
1531                serde_json::json!({
1532                    "name": name,
1533                    "arguments": arguments,
1534                }),
1535            )
1536            .await?;
1537
1538        Ok(json_to_vm_value(&result))
1539    });
1540}
1541
1542fn mcp_roots_builtin(_args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
1543    Ok(VmValue::List(Rc::new(
1544        current_mcp_roots()
1545            .iter()
1546            .map(|root| json_to_vm_value(&root.script_json()))
1547            .collect(),
1548    )))
1549}
1550
1551fn register_harn_mcp_namespace(vm: &mut Vm) {
1552    let mcp_namespace = VmValue::Dict(Rc::new(BTreeMap::from([
1553        (
1554            "_namespace".to_string(),
1555            VmValue::String(Rc::from("harn.mcp")),
1556        ),
1557        (
1558            "roots".to_string(),
1559            VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1560        ),
1561    ])));
1562    vm.set_global(
1563        "harn",
1564        VmValue::Dict(Rc::new(BTreeMap::from([
1565            ("_namespace".to_string(), VmValue::String(Rc::from("harn"))),
1566            (
1567                "mcp_roots".to_string(),
1568                VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1569            ),
1570            ("mcp".to_string(), mcp_namespace),
1571        ]))),
1572    );
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577    use super::*;
1578    use tokio::io::{AsyncReadExt, AsyncWriteExt};
1579    use tokio::net::{TcpListener, TcpStream};
1580    use tokio::sync::mpsc;
1581
1582    #[tokio::test(flavor = "current_thread")]
1583    async fn http_get_stream_dispatches_inbound_elicitation_response() {
1584        tokio::task::LocalSet::new()
1585            .run_until(async {
1586                let (base_url, mut responses) = spawn_eliciting_http_mcp_server().await;
1587                let spec = McpServerSpec {
1588                    name: "mock-http".to_string(),
1589                    transport: McpTransport::Http,
1590                    command: String::new(),
1591                    args: Vec::new(),
1592                    env: BTreeMap::new(),
1593                    url: format!("{base_url}/mcp"),
1594                    auth_token: None,
1595                    protocol_version: None,
1596                    proxy_server_name: None,
1597                };
1598
1599                let handle = connect_mcp_server_from_spec(&spec).await.unwrap();
1600                let response = tokio::time::timeout(MCP_TIMEOUT, responses.recv())
1601                    .await
1602                    .expect("timed out waiting for elicitation response POST")
1603                    .expect("mock server closed before receiving elicitation response");
1604
1605                assert_eq!(response["id"], serde_json::json!(99));
1606                assert_eq!(
1607                    response["result"]["action"],
1608                    serde_json::json!("decline"),
1609                    "without a host bridge, inbound elicitation should decline cleanly"
1610                );
1611                handle.disconnect().await.unwrap();
1612            })
1613            .await;
1614    }
1615
1616    async fn spawn_eliciting_http_mcp_server(
1617    ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1618        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1619        let addr = listener.local_addr().unwrap();
1620        let (response_tx, response_rx) = mpsc::unbounded_channel();
1621
1622        tokio::spawn(async move {
1623            loop {
1624                let Ok((stream, _)) = listener.accept().await else {
1625                    break;
1626                };
1627                let response_tx = response_tx.clone();
1628                tokio::spawn(async move {
1629                    let _ = handle_mock_http_mcp_connection(stream, response_tx).await;
1630                });
1631            }
1632        });
1633
1634        (format!("http://{addr}"), response_rx)
1635    }
1636
1637    async fn spawn_recording_http_mcp_server(
1638    ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1639        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1640        let addr = listener.local_addr().unwrap();
1641        let (request_tx, request_rx) = mpsc::unbounded_channel();
1642
1643        tokio::spawn(async move {
1644            loop {
1645                let Ok((mut stream, _)) = listener.accept().await else {
1646                    break;
1647                };
1648                let request_tx = request_tx.clone();
1649                tokio::spawn(async move {
1650                    let Ok((_request_line, _headers, body)) = read_http_request(&mut stream).await
1651                    else {
1652                        return;
1653                    };
1654                    if let Ok(request) = serde_json::from_slice::<serde_json::Value>(&body) {
1655                        let _ = request_tx.send(request);
1656                    }
1657                    let _ = write_http_empty(&mut stream, "202 Accepted").await;
1658                });
1659            }
1660        });
1661
1662        (format!("http://{addr}"), request_rx)
1663    }
1664
1665    async fn handle_mock_http_mcp_connection(
1666        mut stream: TcpStream,
1667        response_tx: mpsc::UnboundedSender<serde_json::Value>,
1668    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1669        let (request_line, headers, body) = read_http_request(&mut stream).await?;
1670        if request_line.starts_with("GET ") {
1671            let response = concat!(
1672                "HTTP/1.1 200 OK\r\n",
1673                "content-type: text/event-stream\r\n",
1674                "cache-control: no-cache\r\n",
1675                "\r\n",
1676                "id: prime\r\n",
1677                "data: \r\n",
1678                "\r\n",
1679                "id: elicit-1\r\n",
1680                "event: message\r\n",
1681                "data: {\"jsonrpc\":\"2.0\",\"id\":99,\"method\":\"elicitation/create\",\"params\":{\"message\":\"Need input\",\"requestedSchema\":{\"type\":\"object\",\"properties\":{}}}}\r\n",
1682                "\r\n"
1683            );
1684            stream.write_all(response.as_bytes()).await?;
1685            tokio::time::sleep(std::time::Duration::from_millis(250)).await;
1686            return Ok(());
1687        }
1688
1689        let request: serde_json::Value = serde_json::from_slice(&body)?;
1690        let method = request.get("method").and_then(|value| value.as_str());
1691        match method {
1692            Some("initialize") => {
1693                write_http_json(
1694                    &mut stream,
1695                    "200 OK",
1696                    &[("MCP-Session-Id", "test-session")],
1697                    serde_json::json!({
1698                        "jsonrpc": "2.0",
1699                        "id": request["id"].clone(),
1700                        "result": {
1701                            "protocolVersion": PROTOCOL_VERSION,
1702                            "capabilities": {
1703                                "elicitation": {},
1704                                "tools": {}
1705                            },
1706                            "serverInfo": {
1707                                "name": "mock",
1708                                "version": "0.0.0"
1709                            }
1710                        }
1711                    }),
1712                )
1713                .await?;
1714            }
1715            Some("notifications/initialized") => {
1716                write_http_empty(&mut stream, "202 Accepted").await?;
1717            }
1718            _ if request.get("result").is_some() || request.get("error").is_some() => {
1719                assert_eq!(
1720                    headers.get("mcp-session-id").map(String::as_str),
1721                    Some("test-session")
1722                );
1723                let _ = response_tx.send(request);
1724                write_http_empty(&mut stream, "202 Accepted").await?;
1725            }
1726            _ => {
1727                write_http_json(
1728                    &mut stream,
1729                    "200 OK",
1730                    &[],
1731                    serde_json::json!({
1732                        "jsonrpc": "2.0",
1733                        "id": request["id"].clone(),
1734                        "result": {}
1735                    }),
1736                )
1737                .await?;
1738            }
1739        }
1740        Ok(())
1741    }
1742
1743    async fn read_http_request(
1744        stream: &mut TcpStream,
1745    ) -> Result<(String, BTreeMap<String, String>, Vec<u8>), Box<dyn std::error::Error + Send + Sync>>
1746    {
1747        let mut buffer = Vec::new();
1748        loop {
1749            let mut chunk = [0; 1024];
1750            let bytes = stream.read(&mut chunk).await?;
1751            if bytes == 0 {
1752                break;
1753            }
1754            buffer.extend_from_slice(&chunk[..bytes]);
1755            if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
1756                break;
1757            }
1758        }
1759        let header_end = buffer
1760            .windows(4)
1761            .position(|window| window == b"\r\n\r\n")
1762            .ok_or("missing HTTP header terminator")?;
1763        let header_text = String::from_utf8(buffer[..header_end].to_vec())?;
1764        let mut lines = header_text.lines();
1765        let request_line = lines.next().unwrap_or_default().to_string();
1766        let mut headers = BTreeMap::new();
1767        for line in lines {
1768            if let Some((name, value)) = line.split_once(':') {
1769                headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
1770            }
1771        }
1772        let content_length = headers
1773            .get("content-length")
1774            .and_then(|value| value.parse::<usize>().ok())
1775            .unwrap_or(0);
1776        let mut body = buffer[header_end + 4..].to_vec();
1777        while body.len() < content_length {
1778            let mut chunk = vec![0; content_length - body.len()];
1779            let bytes = stream.read(&mut chunk).await?;
1780            if bytes == 0 {
1781                break;
1782            }
1783            body.extend_from_slice(&chunk[..bytes]);
1784        }
1785        body.truncate(content_length);
1786        Ok((request_line, headers, body))
1787    }
1788
1789    async fn write_http_json(
1790        stream: &mut TcpStream,
1791        status: &str,
1792        headers: &[(&str, &str)],
1793        body: serde_json::Value,
1794    ) -> Result<(), std::io::Error> {
1795        let body = serde_json::to_string(&body).unwrap();
1796        let mut response = format!(
1797            "HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\n",
1798            body.len()
1799        );
1800        for (name, value) in headers {
1801            response.push_str(name);
1802            response.push_str(": ");
1803            response.push_str(value);
1804            response.push_str("\r\n");
1805        }
1806        response.push_str("\r\n");
1807        response.push_str(&body);
1808        stream.write_all(response.as_bytes()).await
1809    }
1810
1811    async fn write_http_empty(stream: &mut TcpStream, status: &str) -> Result<(), std::io::Error> {
1812        let response = format!("HTTP/1.1 {status}\r\ncontent-length: 0\r\n\r\n");
1813        stream.write_all(response.as_bytes()).await
1814    }
1815
1816    #[test]
1817    fn test_vm_value_to_serde_string() {
1818        let val = VmValue::String(Rc::from("hello"));
1819        let json = vm_value_to_serde(&val);
1820        assert_eq!(json, serde_json::json!("hello"));
1821    }
1822
1823    #[test]
1824    fn test_vm_value_to_serde_dict() {
1825        let mut map = BTreeMap::new();
1826        map.insert("key".to_string(), VmValue::Int(42));
1827        let val = VmValue::Dict(Rc::new(map));
1828        let json = vm_value_to_serde(&val);
1829        assert_eq!(json, serde_json::json!({"key": 42}));
1830    }
1831
1832    #[test]
1833    fn test_vm_value_to_serde_list() {
1834        let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
1835        let json = vm_value_to_serde(&val);
1836        assert_eq!(json, serde_json::json!([1, 2]));
1837    }
1838
1839    #[test]
1840    fn test_extract_content_text_single() {
1841        let result = serde_json::json!({
1842            "content": [{"type": "text", "text": "hello world"}],
1843            "isError": false
1844        });
1845        assert_eq!(extract_content_text(&result), "hello world");
1846    }
1847
1848    #[test]
1849    fn test_extract_content_text_multiple() {
1850        let result = serde_json::json!({
1851            "content": [
1852                {"type": "text", "text": "first"},
1853                {"type": "text", "text": "second"}
1854            ],
1855            "isError": false
1856        });
1857        assert_eq!(extract_content_text(&result), "first\nsecond");
1858    }
1859
1860    #[test]
1861    fn test_extract_content_text_fallback_json() {
1862        let result = serde_json::json!({
1863            "content": [{"type": "image", "data": "abc"}],
1864            "isError": false
1865        });
1866        let output = extract_content_text(&result);
1867        assert!(output.contains("image"));
1868    }
1869
1870    #[tokio::test(flavor = "current_thread")]
1871    async fn test_parse_sse_jsonrpc_body_uses_matching_jsonrpc_response() {
1872        let inner = HttpMcpClientInner {
1873            client: reqwest::Client::new(),
1874            url: "http://127.0.0.1/mcp".to_string(),
1875            auth_token: None,
1876            protocol_version: PROTOCOL_VERSION.to_string(),
1877            session_id: None,
1878            next_id: 1,
1879            proxy_server_name: None,
1880            get_stream_task: None,
1881        };
1882        let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1883        let parsed = parse_sse_jsonrpc_body(&inner, "mock", body, Some(1))
1884            .await
1885            .unwrap();
1886        assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1887    }
1888
1889    #[test]
1890    fn client_rejects_unadvertised_server_to_client_requests() {
1891        let unknown = client_request_rejection(&serde_json::json!({
1892            "jsonrpc": "2.0",
1893            "id": "custom-1",
1894            "method": "custom/method",
1895            "params": {}
1896        }))
1897        .expect("rejection");
1898        assert_eq!(unknown["error"]["code"], serde_json::json!(-32601));
1899        assert!(unknown["error"].get("data").is_none());
1900    }
1901
1902    #[test]
1903    fn current_mcp_roots_prefers_project_root_over_child_cwd() {
1904        let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1905        let child = root.join("nested");
1906        std::fs::create_dir_all(&child).unwrap();
1907        std::fs::write(root.join("harn.toml"), "[package]\nname = \"roots\"\n").unwrap();
1908
1909        crate::stdlib::process::set_thread_execution_context(Some(
1910            crate::orchestration::RunExecutionRecord {
1911                cwd: Some(child.to_string_lossy().into_owned()),
1912                source_dir: Some(child.to_string_lossy().into_owned()),
1913                ..Default::default()
1914            },
1915        ));
1916
1917        let roots = current_mcp_roots();
1918        let expected_root = std::fs::canonicalize(&root).unwrap();
1919        assert_eq!(roots.len(), 1);
1920        assert_eq!(roots[0].path, expected_root.to_string_lossy());
1921        assert!(roots[0].uri.starts_with("file://"));
1922        assert_eq!(
1923            roots[0].name,
1924            expected_root.file_name().unwrap().to_string_lossy()
1925        );
1926
1927        crate::stdlib::process::reset_process_state();
1928        let _ = std::fs::remove_dir_all(&root);
1929    }
1930
1931    #[tokio::test(flavor = "current_thread")]
1932    async fn handle_inbound_routes_roots_list() {
1933        let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1934        std::fs::create_dir_all(&root).unwrap();
1935        crate::stdlib::process::set_thread_execution_context(Some(
1936            crate::orchestration::RunExecutionRecord {
1937                cwd: Some(root.to_string_lossy().into_owned()),
1938                ..Default::default()
1939            },
1940        ));
1941
1942        let request = serde_json::json!({
1943            "jsonrpc": "2.0",
1944            "id": "roots-1",
1945            "method": crate::mcp_protocol::METHOD_ROOTS_LIST,
1946        });
1947        let response = handle_inbound_client_request("mock", &request)
1948            .await
1949            .expect("roots/list should produce a response");
1950        let expected_root = std::fs::canonicalize(&root).unwrap();
1951        assert_eq!(response["id"], serde_json::json!("roots-1"));
1952        assert_eq!(response["result"]["roots"].as_array().unwrap().len(), 1);
1953        assert_eq!(
1954            response["result"]["roots"][0]["uri"],
1955            serde_json::json!(url::Url::from_file_path(&expected_root)
1956                .unwrap()
1957                .to_string())
1958        );
1959
1960        crate::stdlib::process::reset_process_state();
1961        let _ = std::fs::remove_dir_all(&root);
1962    }
1963
1964    #[tokio::test(flavor = "current_thread")]
1965    async fn roots_list_changed_notification_is_sent_once_per_snapshot() {
1966        tokio::task::LocalSet::new()
1967            .run_until(async {
1968                let (base_url, mut requests) = spawn_recording_http_mcp_server().await;
1969                let handle = VmMcpClientHandle {
1970                    name: "mock-http".to_string(),
1971                    inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
1972                        client: reqwest::Client::new(),
1973                        url: format!("{base_url}/mcp"),
1974                        auth_token: None,
1975                        protocol_version: PROTOCOL_VERSION.to_string(),
1976                        session_id: None,
1977                        next_id: 1,
1978                        proxy_server_name: None,
1979                        get_stream_task: None,
1980                    })))),
1981                    last_roots: Arc::new(Mutex::new(Vec::new())),
1982                    initialize_result: Arc::new(Mutex::new(None)),
1983                };
1984
1985                handle.notify_roots_list_changed_if_needed().await.unwrap();
1986                let notification = tokio::time::timeout(MCP_TIMEOUT, requests.recv())
1987                    .await
1988                    .expect("timed out waiting for roots notification")
1989                    .expect("mock server closed before notification");
1990                assert_eq!(
1991                    notification["method"],
1992                    serde_json::json!(crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION)
1993                );
1994
1995                handle.notify_roots_list_changed_if_needed().await.unwrap();
1996                assert!(
1997                    tokio::time::timeout(std::time::Duration::from_millis(50), requests.recv())
1998                        .await
1999                        .is_err(),
2000                    "unchanged roots should not send another notification"
2001                );
2002            })
2003            .await;
2004    }
2005
2006    #[tokio::test(flavor = "current_thread")]
2007    async fn handle_inbound_routes_sampling_to_dispatcher() {
2008        // Confirms `sampling/createMessage` is routed to
2009        // `mcp_sampling::dispatch_inbound_sampling` rather than the
2010        // generic rejection path. With no host bridge installed, the
2011        // dispatcher declines with the structured `mcp.samplingDeclined`
2012        // error envelope — proving the request reached the right
2013        // handler instead of being bounced as `Method not found`.
2014        let request = serde_json::json!({
2015            "jsonrpc": "2.0",
2016            "id": 42,
2017            "method": crate::mcp_sampling::SAMPLING_METHOD,
2018            "params": {
2019                "messages": [
2020                    {"role": "user", "content": {"type": "text", "text": "ping"}}
2021                ],
2022                "maxTokens": 4,
2023            },
2024        });
2025        let response = handle_inbound_client_request("mock", &request)
2026            .await
2027            .expect("sampling should produce a response");
2028        assert_eq!(response["id"], serde_json::json!(42));
2029        assert_eq!(
2030            response["error"]["data"]["type"],
2031            serde_json::json!("mcp.samplingDeclined")
2032        );
2033    }
2034}