1use std::collections::BTreeMap;
6use std::path::{Path, PathBuf};
7use std::rc::Rc;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use reqwest_eventsource::{Event as SseEvent, EventSource};
12use serde::Deserialize;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::{Child, ChildStdin, ChildStdout};
15use tokio::sync::Mutex;
16
17use crate::stdlib::json_to_vm_value;
18use crate::value::{VmError, VmValue};
19use crate::vm::Vm;
20
21const PROTOCOL_VERSION: &str = "2025-11-25";
23
24const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
26
27#[derive(Clone, Debug, Deserialize)]
28#[serde(rename_all = "lowercase")]
29enum McpTransport {
30 Stdio,
31 Http,
32}
33
34#[derive(Clone, Debug, Deserialize)]
35pub struct McpServerSpec {
36 pub name: String,
37 #[serde(default = "default_transport")]
38 transport: McpTransport,
39 #[serde(default)]
40 pub command: String,
41 #[serde(default)]
42 pub args: Vec<String>,
43 #[serde(default)]
44 pub env: BTreeMap<String, String>,
45 #[serde(default)]
46 pub url: String,
47 #[serde(default)]
48 pub auth_token: Option<String>,
49 #[serde(default)]
50 pub protocol_version: Option<String>,
51 #[serde(default)]
52 pub proxy_server_name: Option<String>,
53}
54
55fn default_transport() -> McpTransport {
56 McpTransport::Stdio
57}
58
59enum McpClientInner {
61 Stdio(StdioMcpClientInner),
62 Http(HttpMcpClientInner),
63}
64
65struct StdioMcpClientInner {
66 child: Child,
67 stdin: ChildStdin,
68 reader: BufReader<ChildStdout>,
69 next_id: u64,
70}
71
72struct HttpMcpClientInner {
73 client: reqwest::Client,
74 url: String,
75 auth_token: Option<String>,
76 protocol_version: String,
77 session_id: Option<String>,
78 next_id: u64,
79 proxy_server_name: Option<String>,
80 get_stream_task: Option<tokio::task::JoinHandle<()>>,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct McpRoot {
85 path: String,
86 uri: String,
87 name: String,
88}
89
90impl McpRoot {
91 fn protocol_json(&self) -> serde_json::Value {
92 serde_json::json!({
93 "uri": self.uri,
94 "name": self.name,
95 })
96 }
97
98 fn script_json(&self) -> serde_json::Value {
99 serde_json::json!({
100 "uri": self.uri,
101 "name": self.name,
102 "path": self.path,
103 })
104 }
105}
106
107impl HttpMcpClientInner {
108 fn abort_get_stream(&mut self) {
109 if let Some(task) = self.get_stream_task.take() {
110 task.abort();
111 }
112 }
113}
114
115#[derive(Clone)]
117pub struct VmMcpClientHandle {
118 pub name: String,
119 inner: Arc<Mutex<Option<McpClientInner>>>,
120 last_roots: Arc<Mutex<Vec<McpRoot>>>,
121 pub(crate) initialize_result: Arc<Mutex<Option<serde_json::Value>>>,
122}
123
124impl std::fmt::Debug for VmMcpClientHandle {
125 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126 write!(f, "McpClient({})", self.name)
127 }
128}
129
130impl VmMcpClientHandle {
131 pub(crate) async fn call(
132 &self,
133 method: &str,
134 params: serde_json::Value,
135 ) -> Result<serde_json::Value, VmError> {
136 if method != "initialize" {
137 self.notify_roots_list_changed_if_needed().await?;
138 }
139 let mut guard = self.inner.lock().await;
140 let inner = guard
141 .as_mut()
142 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
143
144 match inner {
145 McpClientInner::Stdio(inner) => stdio_call(inner, &self.name, method, params).await,
146 McpClientInner::Http(inner) => http_call(inner, &self.name, method, params).await,
147 }
148 }
149
150 async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
151 let mut guard = self.inner.lock().await;
152 let inner = guard
153 .as_mut()
154 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
155
156 match inner {
157 McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
158 McpClientInner::Http(inner) => http_notify(inner, &self.name, method, params).await,
159 }
160 }
161
162 pub(crate) async fn disconnect(&self) -> Result<(), VmError> {
163 let mut guard = self.inner.lock().await;
164 if let Some(inner) = guard.take() {
165 match inner {
166 McpClientInner::Stdio(mut inner) => {
167 let _ = inner.child.kill().await;
168 }
169 McpClientInner::Http(mut inner) => {
170 inner.abort_get_stream();
171 }
172 }
173 }
174 Ok(())
175 }
176
177 async fn notify_roots_list_changed_if_needed(&self) -> Result<(), VmError> {
178 let roots = current_mcp_roots();
179 let mut last_roots = self.last_roots.lock().await;
180 if *last_roots == roots {
181 return Ok(());
182 }
183
184 self.notify(
185 crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION,
186 serde_json::json!({}),
187 )
188 .await?;
189 *last_roots = roots;
190 Ok(())
191 }
192}
193
194async fn stdio_call(
195 inner: &mut StdioMcpClientInner,
196 server_name: &str,
197 method: &str,
198 params: serde_json::Value,
199) -> Result<serde_json::Value, VmError> {
200 let id = inner.next_id;
201 inner.next_id += 1;
202
203 let request = serde_json::json!({
204 "jsonrpc": "2.0",
205 "id": id,
206 "method": method,
207 "params": params,
208 });
209
210 let line = serde_json::to_string(&request)
211 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
212 inner
213 .stdin
214 .write_all(line.as_bytes())
215 .await
216 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
217 inner
218 .stdin
219 .write_all(b"\n")
220 .await
221 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
222 inner
223 .stdin
224 .flush()
225 .await
226 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
227
228 let mut line_buf = String::new();
229 loop {
230 line_buf.clear();
231 let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
232 .await
233 .map_err(|_| {
234 VmError::Runtime(format!(
235 "MCP: server did not respond to '{method}' within {}s",
236 MCP_TIMEOUT.as_secs()
237 ))
238 })?
239 .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
240
241 if bytes_read == 0 {
242 return Err(VmError::Runtime("MCP: server closed connection".into()));
243 }
244
245 let trimmed = line_buf.trim();
246 if trimmed.is_empty() {
247 continue;
248 }
249
250 let msg: serde_json::Value = match serde_json::from_str(trimmed) {
251 Ok(v) => v,
252 Err(_) => continue,
253 };
254
255 if msg.get("id").is_none() {
256 continue;
257 }
258
259 if msg["id"].as_u64() == Some(id)
260 && (msg.get("result").is_some() || msg.get("error").is_some())
261 {
262 return parse_jsonrpc_result(msg);
263 }
264
265 let response = match handle_inbound_client_request(server_name, &msg).await {
266 Some(response) => response,
267 None => continue,
268 };
269 let line = serde_json::to_string(&response)
270 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
271 inner
272 .stdin
273 .write_all(line.as_bytes())
274 .await
275 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
276 inner
277 .stdin
278 .write_all(b"\n")
279 .await
280 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
281 inner
282 .stdin
283 .flush()
284 .await
285 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
286 }
287}
288
289async fn handle_inbound_client_request(
294 server_name: &str,
295 msg: &serde_json::Value,
296) -> Option<serde_json::Value> {
297 let method = msg.get("method").and_then(|value| value.as_str())?;
298 if method == crate::mcp_elicit::ELICITATION_METHOD {
299 return Some(crate::mcp_elicit::dispatch_inbound_elicitation(server_name, msg).await);
300 }
301 if method == crate::mcp_sampling::SAMPLING_METHOD {
302 return Some(crate::mcp_sampling::dispatch_inbound_sampling(server_name, msg).await);
303 }
304 if method == crate::mcp_protocol::METHOD_ROOTS_LIST {
305 let id = msg.get("id")?.clone();
306 return Some(harn_roots_list_response(id));
307 }
308 client_request_rejection(msg)
309}
310
311async fn stdio_notify(
312 inner: &mut StdioMcpClientInner,
313 method: &str,
314 params: serde_json::Value,
315) -> Result<(), VmError> {
316 let notification = serde_json::json!({
317 "jsonrpc": "2.0",
318 "method": method,
319 "params": params,
320 });
321
322 let line = serde_json::to_string(¬ification)
323 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
324 inner
325 .stdin
326 .write_all(line.as_bytes())
327 .await
328 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
329 inner
330 .stdin
331 .write_all(b"\n")
332 .await
333 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
334 inner
335 .stdin
336 .flush()
337 .await
338 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
339 Ok(())
340}
341
342async fn http_call(
343 inner: &mut HttpMcpClientInner,
344 server_name: &str,
345 method: &str,
346 params: serde_json::Value,
347) -> Result<serde_json::Value, VmError> {
348 let id = inner.next_id;
349 inner.next_id += 1;
350 send_http_request(inner, server_name, method, params, Some(id)).await
351}
352
353async fn http_notify(
354 inner: &mut HttpMcpClientInner,
355 server_name: &str,
356 method: &str,
357 params: serde_json::Value,
358) -> Result<(), VmError> {
359 let _ = send_http_request(inner, server_name, method, params, None).await?;
360 Ok(())
361}
362
363async fn send_http_request(
364 inner: &mut HttpMcpClientInner,
365 server_name: &str,
366 method: &str,
367 params: serde_json::Value,
368 id: Option<u64>,
369) -> Result<serde_json::Value, VmError> {
370 for attempt in 0..2 {
371 let response = send_http_request_once(inner, method, params.clone(), id).await?;
372
373 let status = response.status().as_u16();
374 let headers = response.headers().clone();
375 if let Some(protocol_version) = headers
376 .get("MCP-Protocol-Version")
377 .and_then(|v| v.to_str().ok())
378 {
379 inner.protocol_version = protocol_version.to_string();
380 }
381 if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
382 inner.session_id = Some(session_id.to_string());
383 }
384
385 if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
386 inner.session_id = None;
387 inner.abort_get_stream();
388 reinitialize_http_client(inner).await?;
389 continue;
390 }
391
392 if status == 401 {
393 return Err(VmError::Thrown(VmValue::String(Rc::from(
394 "MCP authorization required",
395 ))));
396 }
397
398 let body = response
399 .text()
400 .await
401 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
402
403 if body.trim().is_empty() {
404 if status < 400 {
405 ensure_http_get_stream(inner, server_name);
406 }
407 return Ok(serde_json::Value::Null);
408 }
409
410 let msg = parse_http_response_body(inner, server_name, &body, status, id).await?;
411
412 if status >= 400 {
413 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
414 }
415
416 ensure_http_get_stream(inner, server_name);
417 if id.is_none() {
418 return Ok(msg);
419 }
420 return parse_jsonrpc_result(msg);
421 }
422
423 Err(VmError::Runtime("MCP HTTP request failed".into()))
424}
425
426async fn send_http_request_once(
427 inner: &mut HttpMcpClientInner,
428 method: &str,
429 params: serde_json::Value,
430 id: Option<u64>,
431) -> Result<reqwest::Response, VmError> {
432 let mut payload = serde_json::json!({
433 "jsonrpc": "2.0",
434 "method": method,
435 "params": params,
436 });
437 if let Some(id) = id {
438 payload["id"] = serde_json::json!(id);
439 }
440 let payload = wrap_http_payload(payload, inner.proxy_server_name.as_deref());
441
442 let request = inner
443 .client
444 .post(&inner.url)
445 .header("Content-Type", "application/json")
446 .header("Accept", "application/json, text/event-stream")
447 .json(&payload);
448 let request = apply_http_headers(
449 request,
450 &inner.auth_token,
451 &inner.protocol_version,
452 inner.session_id.as_deref(),
453 );
454
455 request
456 .timeout(MCP_TIMEOUT)
457 .send()
458 .await
459 .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
460}
461
462fn ensure_http_get_stream(inner: &mut HttpMcpClientInner, server_name: &str) {
463 if server_name.is_empty() {
464 return;
465 }
466 if inner
467 .get_stream_task
468 .as_ref()
469 .is_some_and(|task| !task.is_finished())
470 {
471 return;
472 }
473
474 let config = HttpStreamConfig {
475 client: inner.client.clone(),
476 url: inner.url.clone(),
477 auth_token: inner.auth_token.clone(),
478 protocol_version: inner.protocol_version.clone(),
479 session_id: inner.session_id.clone(),
480 proxy_server_name: inner.proxy_server_name.clone(),
481 server_name: server_name.to_string(),
482 };
483 inner.get_stream_task = Some(tokio::task::spawn_local(run_http_get_stream(config)));
484}
485
486#[derive(Clone)]
487struct HttpStreamConfig {
488 client: reqwest::Client,
489 url: String,
490 auth_token: Option<String>,
491 protocol_version: String,
492 session_id: Option<String>,
493 proxy_server_name: Option<String>,
494 server_name: String,
495}
496
497async fn run_http_get_stream(config: HttpStreamConfig) {
498 let request = apply_http_headers(
499 config
500 .client
501 .get(&config.url)
502 .header("Accept", "text/event-stream"),
503 &config.auth_token,
504 &config.protocol_version,
505 config.session_id.as_deref(),
506 );
507 let Ok(mut stream) = EventSource::new(request) else {
508 return;
509 };
510
511 while let Some(event) = stream.next().await {
512 match event {
513 Ok(SseEvent::Open) => {}
514 Ok(SseEvent::Message(message)) => {
515 if message.data.trim().is_empty() {
516 continue;
517 }
518 let Ok(msg) = serde_json::from_str::<serde_json::Value>(&message.data) else {
519 tracing::debug!("MCP HTTP GET stream received non-JSON event");
520 continue;
521 };
522 if let Some(response) =
523 handle_inbound_client_request(&config.server_name, &msg).await
524 {
525 let _ = post_http_jsonrpc_payload(&config, response).await;
526 }
527 }
528 Err(error) => {
529 tracing::debug!("MCP HTTP GET stream ended with error: {error}");
530 break;
531 }
532 }
533 }
534 stream.close();
535}
536
537async fn post_http_jsonrpc_payload(
538 config: &HttpStreamConfig,
539 payload: serde_json::Value,
540) -> Result<(), VmError> {
541 let payload = wrap_http_payload(payload, config.proxy_server_name.as_deref());
542 let request = config
543 .client
544 .post(&config.url)
545 .header("Content-Type", "application/json")
546 .header("Accept", "application/json, text/event-stream")
547 .json(&payload)
548 .timeout(MCP_TIMEOUT);
549 let request = apply_http_headers(
550 request,
551 &config.auth_token,
552 &config.protocol_version,
553 config.session_id.as_deref(),
554 );
555 let response = request
556 .send()
557 .await
558 .map_err(|e| VmError::Runtime(format!("MCP HTTP response POST error: {e}")))?;
559 if response.status().is_success() {
560 Ok(())
561 } else {
562 Err(VmError::Runtime(format!(
563 "MCP HTTP response POST returned {}",
564 response.status()
565 )))
566 }
567}
568
569fn apply_http_headers(
570 mut request: reqwest::RequestBuilder,
571 auth_token: &Option<String>,
572 protocol_version: &str,
573 session_id: Option<&str>,
574) -> reqwest::RequestBuilder {
575 request = request.header("MCP-Protocol-Version", protocol_version);
576 if let Some(token) = auth_token {
577 request = request.header("Authorization", format!("Bearer {token}"));
578 }
579 if let Some(session_id) = session_id {
580 request = request.header("MCP-Session-Id", session_id);
581 }
582 request
583}
584
585fn wrap_http_payload(
586 payload: serde_json::Value,
587 proxy_server_name: Option<&str>,
588) -> serde_json::Value {
589 let Some(proxy_server_name) = proxy_server_name else {
590 return payload;
591 };
592 let mut wrapped = serde_json::Map::new();
593 wrapped.insert(
594 "serverName".to_string(),
595 serde_json::Value::String(proxy_server_name.to_string()),
596 );
597 if let Some(object) = payload.as_object() {
598 for (key, value) in object {
599 wrapped.insert(key.clone(), value.clone());
600 }
601 }
602 serde_json::Value::Object(wrapped)
603}
604
605async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
606 let initialize = send_http_request_once(
607 inner,
608 "initialize",
609 serde_json::json!({
610 "protocolVersion": PROTOCOL_VERSION,
611 "capabilities": {
612 "elicitation": {},
613 "roots": {
614 "listChanged": true,
615 },
616 "sampling": {},
617 },
618 "clientInfo": {
619 "name": "harn",
620 "version": env!("CARGO_PKG_VERSION"),
621 }
622 }),
623 Some(0),
624 )
625 .await?;
626 if let Some(protocol_version) = initialize
627 .headers()
628 .get("MCP-Protocol-Version")
629 .and_then(|v| v.to_str().ok())
630 {
631 inner.protocol_version = protocol_version.to_string();
632 }
633 if let Some(session_id) = initialize
634 .headers()
635 .get("MCP-Session-Id")
636 .and_then(|v| v.to_str().ok())
637 {
638 inner.session_id = Some(session_id.to_string());
639 }
640 let status = initialize.status().as_u16();
641 let body = initialize
642 .text()
643 .await
644 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
645 let msg = parse_http_response_body(inner, "", &body, status, Some(0)).await?;
646 if status >= 400 {
647 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
648 }
649 let _ = parse_jsonrpc_result(msg)?;
650 let response = send_http_request_once(
651 inner,
652 "notifications/initialized",
653 serde_json::json!({}),
654 None,
655 )
656 .await?;
657 let status = response.status().as_u16();
658 if let Some(protocol_version) = response
659 .headers()
660 .get("MCP-Protocol-Version")
661 .and_then(|v| v.to_str().ok())
662 {
663 inner.protocol_version = protocol_version.to_string();
664 }
665 if let Some(session_id) = response
666 .headers()
667 .get("MCP-Session-Id")
668 .and_then(|v| v.to_str().ok())
669 {
670 inner.session_id = Some(session_id.to_string());
671 }
672 let body = response
673 .text()
674 .await
675 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
676 if body.trim().is_empty() || status < 400 {
677 return Ok(());
678 }
679 let msg = parse_http_response_body(inner, "", &body, status, None).await?;
680 Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
681}
682
683async fn parse_http_response_body(
684 inner: &HttpMcpClientInner,
685 server_name: &str,
686 body: &str,
687 status: u16,
688 request_id: Option<u64>,
689) -> Result<serde_json::Value, VmError> {
690 if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
691 return parse_sse_jsonrpc_body(inner, server_name, body, request_id).await;
692 }
693 serde_json::from_str(body).map_err(|e| {
694 VmError::Runtime(format!(
695 "MCP HTTP response parse error (status {status}): {e}"
696 ))
697 })
698}
699
700async fn parse_sse_jsonrpc_body(
701 inner: &HttpMcpClientInner,
702 server_name: &str,
703 body: &str,
704 request_id: Option<u64>,
705) -> Result<serde_json::Value, VmError> {
706 let mut current_data = Vec::new();
707 let mut messages = Vec::new();
708
709 for line in body.lines() {
710 if line.is_empty() {
711 if !current_data.is_empty() {
712 messages.push(current_data.join("\n"));
713 current_data.clear();
714 }
715 continue;
716 }
717 if let Some(data) = line.strip_prefix("data:") {
718 current_data.push(data.trim_start().to_string());
719 }
720 }
721 if !current_data.is_empty() {
722 messages.push(current_data.join("\n"));
723 }
724
725 let config = HttpStreamConfig {
726 client: inner.client.clone(),
727 url: inner.url.clone(),
728 auth_token: inner.auth_token.clone(),
729 protocol_version: inner.protocol_version.clone(),
730 session_id: inner.session_id.clone(),
731 proxy_server_name: inner.proxy_server_name.clone(),
732 server_name: server_name.to_string(),
733 };
734
735 let mut fallback = None;
736 for message in messages {
737 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
738 if request_id.is_some()
739 && value["id"].as_u64() == request_id
740 && (value.get("result").is_some() || value.get("error").is_some())
741 {
742 return Ok(value);
743 }
744 if let Some(response) = handle_inbound_client_request(server_name, &value).await {
745 let _ = post_http_jsonrpc_payload(&config, response).await;
746 continue;
747 }
748 if value.get("result").is_some() || value.get("error").is_some() {
749 fallback = Some(value);
750 }
751 }
752 }
753
754 fallback.ok_or_else(|| {
755 VmError::Runtime(
756 "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
757 )
758 })
759}
760
761fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
762 if let Some(error) = msg.get("error") {
763 return Err(jsonrpc_error_to_vm_error(error));
764 }
765 Ok(msg
766 .get("result")
767 .cloned()
768 .unwrap_or(serde_json::Value::Null))
769}
770
771fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
772 let message = error
773 .get("message")
774 .and_then(|v| v.as_str())
775 .unwrap_or("Unknown MCP error");
776 let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
777 VmError::Thrown(VmValue::String(Rc::from(format!(
778 "MCP error ({code}): {message}"
779 ))))
780}
781
782fn client_request_rejection(msg: &serde_json::Value) -> Option<serde_json::Value> {
783 let request_id = msg.get("id")?.clone();
784 let method = msg.get("method").and_then(|value| value.as_str())?;
785 crate::mcp_protocol::unsupported_latest_spec_method_response(request_id.clone(), method)
786 .or_else(|| {
787 Some(crate::jsonrpc::error_response(
788 request_id,
789 -32601,
790 &format!("Method not found: {method}"),
791 ))
792 })
793}
794
795fn harn_roots_list_response(id: serde_json::Value) -> serde_json::Value {
796 crate::jsonrpc::response(
797 id,
798 serde_json::json!({
799 "roots": current_mcp_roots()
800 .iter()
801 .map(McpRoot::protocol_json)
802 .collect::<Vec<_>>()
803 }),
804 )
805}
806
807pub(crate) fn current_mcp_roots() -> Vec<McpRoot> {
808 compact_root_paths(current_mcp_root_candidates())
809 .into_iter()
810 .filter_map(|path| {
811 let uri = url::Url::from_file_path(&path).ok()?.to_string();
812 Some(McpRoot {
813 name: root_display_name(&path),
814 path: path.to_string_lossy().into_owned(),
815 uri,
816 })
817 })
818 .collect()
819}
820
821fn current_mcp_root_candidates() -> Vec<PathBuf> {
822 let mut candidates = Vec::new();
823 if let Some(context) = crate::stdlib::process::current_execution_context() {
824 if let Some(path) = non_empty_path(context.worktree_path.as_deref()) {
825 candidates.push(path);
826 }
827 if let Some(cwd) = non_empty_path(context.cwd.as_deref()) {
828 push_project_root_or_path(&mut candidates, cwd);
829 }
830 if let Some(source_dir) = non_empty_path(context.source_dir.as_deref()) {
831 push_project_root_or_path(&mut candidates, source_dir);
832 }
833 } else {
834 push_project_root_or_path(
835 &mut candidates,
836 crate::stdlib::process::execution_root_path(),
837 );
838 push_project_root_or_path(&mut candidates, crate::stdlib::process::source_root_path());
839 }
840
841 if candidates.is_empty() {
842 candidates.push(crate::stdlib::process::execution_root_path());
843 }
844 candidates
845}
846
847fn non_empty_path(raw: Option<&str>) -> Option<PathBuf> {
848 raw.filter(|path| !path.trim().is_empty())
849 .map(PathBuf::from)
850}
851
852fn push_project_root_or_path(candidates: &mut Vec<PathBuf>, path: PathBuf) {
853 let normalized = crate::stdlib::process::normalize_context_path(&path);
854 match crate::stdlib::process::find_project_root(&normalized) {
855 Some(root) => candidates.push(root),
856 None => candidates.push(normalized),
857 }
858}
859
860fn compact_root_paths(paths: Vec<PathBuf>) -> Vec<PathBuf> {
861 let mut normalized = paths
862 .into_iter()
863 .map(normalize_root_path)
864 .collect::<Vec<_>>();
865 normalized.sort_by_key(|path| {
866 (
867 path.components().count(),
868 path.to_string_lossy().to_string(),
869 )
870 });
871
872 let mut roots: Vec<PathBuf> = Vec::new();
873 for path in normalized {
874 if roots
875 .iter()
876 .any(|existing| path == *existing || path.starts_with(existing))
877 {
878 continue;
879 }
880 roots.push(path);
881 }
882 roots
883}
884
885fn normalize_root_path(path: PathBuf) -> PathBuf {
886 let absolute = crate::stdlib::process::normalize_context_path(&path);
887 std::fs::canonicalize(&absolute).unwrap_or(absolute)
888}
889
890fn root_display_name(path: &Path) -> String {
891 path.file_name()
892 .and_then(|name| name.to_str())
893 .filter(|name| !name.is_empty())
894 .map(str::to_string)
895 .unwrap_or_else(|| path.display().to_string())
896}
897
898async fn mcp_connect_stdio_impl(
899 command: &str,
900 args: &[String],
901 env: &BTreeMap<String, String>,
902) -> Result<VmMcpClientHandle, VmError> {
903 let mut cmd = tokio::process::Command::new(command);
904 cmd.args(args)
905 .stdin(std::process::Stdio::piped())
906 .stdout(std::process::Stdio::piped())
907 .stderr(std::process::Stdio::inherit())
908 .envs(env);
909
910 let mut child = cmd.spawn().map_err(|e| {
911 VmError::Thrown(VmValue::String(Rc::from(format!(
912 "mcp_connect: failed to spawn '{command}': {e}"
913 ))))
914 })?;
915
916 let stdin = child
917 .stdin
918 .take()
919 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
920 let stdout = child
921 .stdout
922 .take()
923 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
924
925 let handle = VmMcpClientHandle {
926 name: command.to_string(),
927 inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
928 StdioMcpClientInner {
929 child,
930 stdin,
931 reader: BufReader::new(stdout),
932 next_id: 1,
933 },
934 )))),
935 last_roots: Arc::new(Mutex::new(Vec::new())),
936 initialize_result: Arc::new(Mutex::new(None)),
937 };
938
939 initialize_client(&handle).await?;
940 Ok(handle)
941}
942
943async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
944 let client = reqwest::Client::builder()
945 .build()
946 .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
947
948 let handle = VmMcpClientHandle {
949 name: spec.name.clone(),
950 inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
951 client,
952 url: spec.url.clone(),
953 auth_token: spec.auth_token.clone(),
954 protocol_version: spec
955 .protocol_version
956 .clone()
957 .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
958 session_id: None,
959 next_id: 1,
960 proxy_server_name: spec.proxy_server_name.clone(),
961 get_stream_task: None,
962 })))),
963 last_roots: Arc::new(Mutex::new(Vec::new())),
964 initialize_result: Arc::new(Mutex::new(None)),
965 };
966
967 initialize_client(&handle).await?;
968 Ok(handle)
969}
970
971async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
972 let initialize_result = handle
973 .call(
974 "initialize",
975 serde_json::json!({
976 "protocolVersion": PROTOCOL_VERSION,
977 "capabilities": {
978 "elicitation": {},
979 "roots": {
980 "listChanged": true,
981 },
982 "sampling": {},
983 },
984 "clientInfo": {
985 "name": "harn",
986 "version": env!("CARGO_PKG_VERSION"),
987 }
988 }),
989 )
990 .await?;
991 *handle.initialize_result.lock().await = Some(initialize_result);
992
993 handle
994 .notify("notifications/initialized", serde_json::json!({}))
995 .await?;
996
997 Ok(())
998}
999
1000pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
1001 match val {
1002 VmValue::String(s) => serde_json::Value::String(s.to_string()),
1003 VmValue::Int(n) => serde_json::json!(*n),
1004 VmValue::Float(n) => serde_json::json!(*n),
1005 VmValue::Bool(b) => serde_json::Value::Bool(*b),
1006 VmValue::Nil => serde_json::Value::Null,
1007 VmValue::List(items) => {
1008 serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
1009 }
1010 VmValue::Dict(map) => {
1011 let obj: serde_json::Map<String, serde_json::Value> = map
1012 .iter()
1013 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1014 .collect();
1015 serde_json::Value::Object(obj)
1016 }
1017 _ => serde_json::Value::Null,
1018 }
1019}
1020
1021fn extract_content_text(result: &serde_json::Value) -> String {
1022 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
1023 let texts: Vec<&str> = content
1024 .iter()
1025 .filter_map(|item| {
1026 if item.get("type").and_then(|t| t.as_str()) == Some("text") {
1027 item.get("text").and_then(|t| t.as_str())
1028 } else {
1029 None
1030 }
1031 })
1032 .collect();
1033 if texts.is_empty() {
1034 json_to_vm_value(result).display()
1035 } else {
1036 texts.join("\n")
1037 }
1038 } else {
1039 json_to_vm_value(result).display()
1040 }
1041}
1042
1043pub(crate) async fn call_mcp_tool(
1044 client: &VmMcpClientHandle,
1045 tool_name: &str,
1046 arguments: serde_json::Value,
1047) -> Result<serde_json::Value, VmError> {
1048 let result = client
1049 .call(
1050 "tools/call",
1051 serde_json::json!({
1052 "name": tool_name,
1053 "arguments": arguments,
1054 }),
1055 )
1056 .await?;
1057
1058 if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
1059 let error_text = extract_content_text(&result);
1060 return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
1061 }
1062
1063 let content = result
1064 .get("content")
1065 .and_then(|c| c.as_array())
1066 .cloned()
1067 .unwrap_or_default();
1068
1069 if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
1070 if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
1071 return Ok(serde_json::Value::String(text.to_string()));
1072 }
1073 }
1074
1075 if content.is_empty() {
1076 Ok(serde_json::Value::Null)
1077 } else {
1078 Ok(serde_json::Value::Array(content))
1079 }
1080}
1081
1082pub async fn connect_mcp_server(
1083 name: &str,
1084 command: &str,
1085 args: &[String],
1086) -> Result<VmMcpClientHandle, VmError> {
1087 let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
1088 handle.name = name.to_string();
1089 Ok(handle)
1090}
1091
1092pub async fn connect_mcp_server_from_spec(
1093 spec: &McpServerSpec,
1094) -> Result<VmMcpClientHandle, VmError> {
1095 let mut handle = match spec.transport {
1096 McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
1097 McpTransport::Http => mcp_connect_http_impl(spec).await?,
1098 };
1099 handle.name = spec.name.clone();
1100 Ok(handle)
1101}
1102
1103pub async fn connect_mcp_server_from_json(
1104 value: &serde_json::Value,
1105) -> Result<VmMcpClientHandle, VmError> {
1106 let spec: McpServerSpec = serde_json::from_value(value.clone())
1107 .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
1108 connect_mcp_server_from_spec(&spec).await
1109}
1110
1111pub fn register_mcp_builtins(vm: &mut Vm) {
1112 vm.register_builtin("mcp_roots", mcp_roots_builtin);
1113 vm.register_builtin("harn.mcp.roots", mcp_roots_builtin);
1114 register_harn_mcp_namespace(vm);
1115
1116 vm.register_async_builtin("mcp_connect", |args| async move {
1117 let command = args.first().map(|a| a.display()).unwrap_or_default();
1118 if command.is_empty() {
1119 return Err(VmError::Thrown(VmValue::String(Rc::from(
1120 "mcp_connect: command is required",
1121 ))));
1122 }
1123
1124 let cmd_args: Vec<String> = match args.get(1) {
1125 Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
1126 _ => Vec::new(),
1127 };
1128
1129 let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
1130 Ok(VmValue::McpClient(handle))
1131 });
1132
1133 vm.register_async_builtin("mcp_ensure_active", |args| async move {
1137 let name = match args.first() {
1138 Some(VmValue::String(s)) => s.to_string(),
1139 Some(other) => other.display(),
1140 None => String::new(),
1141 };
1142 if name.is_empty() {
1143 return Err(VmError::Thrown(VmValue::String(Rc::from(
1144 "mcp_ensure_active: server name is required",
1145 ))));
1146 }
1147 let handle = crate::mcp_registry::ensure_active(&name).await?;
1148 Ok(VmValue::McpClient(handle))
1149 });
1150
1151 vm.register_builtin("mcp_release", |args, _out| {
1155 let name = match args.first() {
1156 Some(VmValue::String(s)) => s.to_string(),
1157 Some(other) => other.display(),
1158 None => {
1159 return Err(VmError::Thrown(VmValue::String(Rc::from(
1160 "mcp_release: server name is required",
1161 ))));
1162 }
1163 };
1164 crate::mcp_registry::release(&name);
1165 Ok(VmValue::Nil)
1166 });
1167
1168 vm.register_builtin("mcp_registry_status", |_args, _out| {
1172 let mut out = Vec::new();
1173 for entry in crate::mcp_registry::snapshot_status() {
1174 let mut dict = BTreeMap::new();
1175 dict.insert(
1176 "name".to_string(),
1177 VmValue::String(Rc::from(entry.name.as_str())),
1178 );
1179 dict.insert("lazy".to_string(), VmValue::Bool(entry.lazy));
1180 dict.insert("active".to_string(), VmValue::Bool(entry.active));
1181 dict.insert(
1182 "ref_count".to_string(),
1183 VmValue::Int(entry.ref_count as i64),
1184 );
1185 if let Some(card) = entry.card {
1186 dict.insert("card".to_string(), VmValue::String(Rc::from(card.as_str())));
1187 }
1188 out.push(VmValue::Dict(Rc::new(dict)));
1189 }
1190 Ok(VmValue::List(Rc::new(out)))
1191 });
1192
1193 vm.register_async_builtin("mcp_server_card", |args| async move {
1200 let target = match args.first() {
1201 Some(VmValue::String(s)) => s.to_string(),
1202 Some(other) => other.display(),
1203 None => {
1204 return Err(VmError::Thrown(VmValue::String(Rc::from(
1205 "mcp_server_card: server name, URL, or path is required",
1206 ))));
1207 }
1208 };
1209
1210 let source = if target.starts_with("http://")
1216 || target.starts_with("https://")
1217 || target.contains('/')
1218 || target.contains('\\')
1219 || target.ends_with(".json")
1220 {
1221 target.clone()
1222 } else {
1223 match crate::mcp_registry::get_registration(&target) {
1224 Some(reg) => match reg.card {
1225 Some(card) => card,
1226 None => {
1227 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1228 "mcp_server_card: server '{target}' has no 'card' field in harn.toml"
1229 )))));
1230 }
1231 },
1232 None => {
1233 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1234 "mcp_server_card: no MCP server '{target}' registered (check harn.toml) \
1235 — pass a URL or path directly instead"
1236 )))));
1237 }
1238 }
1239 };
1240
1241 let card = crate::mcp_card::fetch_server_card(&source, None)
1242 .await
1243 .map_err(|e| {
1244 VmError::Thrown(VmValue::String(Rc::from(format!("mcp_server_card: {e}"))))
1245 })?;
1246 Ok(json_to_vm_value(&card))
1247 });
1248
1249 vm.register_async_builtin("mcp_list_tools", |args| async move {
1250 let client = match args.first() {
1251 Some(VmValue::McpClient(c)) => c.clone(),
1252 _ => {
1253 return Err(VmError::Thrown(VmValue::String(Rc::from(
1254 "mcp_list_tools: argument must be an MCP client",
1255 ))));
1256 }
1257 };
1258
1259 let result = client.call("tools/list", serde_json::json!({})).await?;
1260 let mut tools = result
1261 .get("tools")
1262 .and_then(|t| t.as_array())
1263 .cloned()
1264 .unwrap_or_default();
1265
1266 let server_name = client.name.clone();
1271 for tool in tools.iter_mut() {
1272 if let Some(obj) = tool.as_object_mut() {
1273 obj.entry("_mcp_server")
1274 .or_insert_with(|| serde_json::Value::String(server_name.clone()));
1275 }
1276 }
1277
1278 let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
1279 Ok(VmValue::List(Rc::new(vm_tools)))
1280 });
1281
1282 vm.register_async_builtin("mcp_call", |args| async move {
1283 let client = match args.first() {
1284 Some(VmValue::McpClient(c)) => c.clone(),
1285 _ => {
1286 return Err(VmError::Thrown(VmValue::String(Rc::from(
1287 "mcp_call: first argument must be an MCP client",
1288 ))));
1289 }
1290 };
1291
1292 let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
1293 if tool_name.is_empty() {
1294 return Err(VmError::Thrown(VmValue::String(Rc::from(
1295 "mcp_call: tool name is required",
1296 ))));
1297 }
1298
1299 let arguments = match args.get(2) {
1300 Some(VmValue::Dict(d)) => {
1301 let obj: serde_json::Map<String, serde_json::Value> = d
1302 .iter()
1303 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1304 .collect();
1305 serde_json::Value::Object(obj)
1306 }
1307 _ => serde_json::json!({}),
1308 };
1309
1310 Ok(json_to_vm_value(
1311 &call_mcp_tool(&client, &tool_name, arguments).await?,
1312 ))
1313 });
1314
1315 vm.register_async_builtin("mcp_server_info", |args| async move {
1316 let client = match args.first() {
1317 Some(VmValue::McpClient(c)) => c.clone(),
1318 _ => {
1319 return Err(VmError::Thrown(VmValue::String(Rc::from(
1320 "mcp_server_info: argument must be an MCP client",
1321 ))));
1322 }
1323 };
1324
1325 let guard = client.inner.lock().await;
1326 if guard.is_none() {
1327 return Err(VmError::Runtime("MCP client is disconnected".into()));
1328 }
1329 drop(guard);
1330
1331 let mut info = BTreeMap::new();
1332 info.insert(
1333 "name".to_string(),
1334 VmValue::String(Rc::from(client.name.as_str())),
1335 );
1336 info.insert("connected".to_string(), VmValue::Bool(true));
1337 let initialize = client
1338 .initialize_result
1339 .lock()
1340 .await
1341 .clone()
1342 .unwrap_or(serde_json::Value::Null);
1343 if !initialize.is_null() {
1344 if let Some(instructions) = initialize
1345 .get("instructions")
1346 .or_else(|| {
1347 initialize
1348 .get("serverInfo")
1349 .and_then(|value| value.get("instructions"))
1350 })
1351 .and_then(|value| value.as_str())
1352 .filter(|value| !value.is_empty())
1353 {
1354 info.insert(
1355 "instructions".to_string(),
1356 VmValue::String(Rc::from(instructions)),
1357 );
1358 }
1359 info.insert("initialize".to_string(), json_to_vm_value(&initialize));
1360 }
1361 Ok(VmValue::Dict(Rc::new(info)))
1362 });
1363
1364 vm.register_async_builtin("mcp_disconnect", |args| async move {
1365 let client = match args.first() {
1366 Some(VmValue::McpClient(c)) => c.clone(),
1367 _ => {
1368 return Err(VmError::Thrown(VmValue::String(Rc::from(
1369 "mcp_disconnect: argument must be an MCP client",
1370 ))));
1371 }
1372 };
1373
1374 client.disconnect().await?;
1375 Ok(VmValue::Nil)
1376 });
1377
1378 vm.register_async_builtin("mcp_list_resources", |args| async move {
1379 let client = match args.first() {
1380 Some(VmValue::McpClient(c)) => c.clone(),
1381 _ => {
1382 return Err(VmError::Thrown(VmValue::String(Rc::from(
1383 "mcp_list_resources: argument must be an MCP client",
1384 ))));
1385 }
1386 };
1387
1388 let result = client.call("resources/list", serde_json::json!({})).await?;
1389 let resources = result
1390 .get("resources")
1391 .and_then(|r| r.as_array())
1392 .cloned()
1393 .unwrap_or_default();
1394
1395 let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
1396 Ok(VmValue::List(Rc::new(vm_resources)))
1397 });
1398
1399 vm.register_async_builtin("mcp_read_resource", |args| async move {
1400 let client = match args.first() {
1401 Some(VmValue::McpClient(c)) => c.clone(),
1402 _ => {
1403 return Err(VmError::Thrown(VmValue::String(Rc::from(
1404 "mcp_read_resource: first argument must be an MCP client",
1405 ))));
1406 }
1407 };
1408
1409 let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
1410 if uri.is_empty() {
1411 return Err(VmError::Thrown(VmValue::String(Rc::from(
1412 "mcp_read_resource: URI is required",
1413 ))));
1414 }
1415
1416 let result = client
1417 .call("resources/read", serde_json::json!({ "uri": uri }))
1418 .await?;
1419
1420 let contents = result
1421 .get("contents")
1422 .and_then(|c| c.as_array())
1423 .cloned()
1424 .unwrap_or_default();
1425
1426 if contents.len() == 1 {
1427 if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
1428 return Ok(VmValue::String(Rc::from(text)));
1429 }
1430 }
1431
1432 if contents.is_empty() {
1433 Ok(VmValue::Nil)
1434 } else {
1435 Ok(VmValue::List(Rc::new(
1436 contents.iter().map(json_to_vm_value).collect(),
1437 )))
1438 }
1439 });
1440
1441 vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
1442 let client = match args.first() {
1443 Some(VmValue::McpClient(c)) => c.clone(),
1444 _ => {
1445 return Err(VmError::Thrown(VmValue::String(Rc::from(
1446 "mcp_list_resource_templates: argument must be an MCP client",
1447 ))));
1448 }
1449 };
1450
1451 let result = client
1452 .call("resources/templates/list", serde_json::json!({}))
1453 .await?;
1454
1455 let templates = result
1456 .get("resourceTemplates")
1457 .and_then(|r| r.as_array())
1458 .cloned()
1459 .unwrap_or_default();
1460
1461 let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
1462 Ok(VmValue::List(Rc::new(vm_templates)))
1463 });
1464
1465 vm.register_async_builtin("mcp_list_prompts", |args| async move {
1466 let client = match args.first() {
1467 Some(VmValue::McpClient(c)) => c.clone(),
1468 _ => {
1469 return Err(VmError::Thrown(VmValue::String(Rc::from(
1470 "mcp_list_prompts: argument must be an MCP client",
1471 ))));
1472 }
1473 };
1474
1475 let result = client.call("prompts/list", serde_json::json!({})).await?;
1476
1477 let prompts = result
1478 .get("prompts")
1479 .and_then(|p| p.as_array())
1480 .cloned()
1481 .unwrap_or_default();
1482
1483 let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
1484 Ok(VmValue::List(Rc::new(vm_prompts)))
1485 });
1486
1487 vm.register_async_builtin("mcp_get_prompt", |args| async move {
1488 let client = match args.first() {
1489 Some(VmValue::McpClient(c)) => c.clone(),
1490 _ => {
1491 return Err(VmError::Thrown(VmValue::String(Rc::from(
1492 "mcp_get_prompt: first argument must be an MCP client",
1493 ))));
1494 }
1495 };
1496
1497 let name = args.get(1).map(|a| a.display()).unwrap_or_default();
1498 if name.is_empty() {
1499 return Err(VmError::Thrown(VmValue::String(Rc::from(
1500 "mcp_get_prompt: prompt name is required",
1501 ))));
1502 }
1503
1504 let arguments = match args.get(2) {
1505 Some(VmValue::Dict(d)) => {
1506 let obj: serde_json::Map<String, serde_json::Value> = d
1507 .iter()
1508 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1509 .collect();
1510 serde_json::Value::Object(obj)
1511 }
1512 _ => serde_json::json!({}),
1513 };
1514
1515 let result = client
1516 .call(
1517 "prompts/get",
1518 serde_json::json!({
1519 "name": name,
1520 "arguments": arguments,
1521 }),
1522 )
1523 .await?;
1524
1525 Ok(json_to_vm_value(&result))
1526 });
1527}
1528
1529fn mcp_roots_builtin(_args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
1530 Ok(VmValue::List(Rc::new(
1531 current_mcp_roots()
1532 .iter()
1533 .map(|root| json_to_vm_value(&root.script_json()))
1534 .collect(),
1535 )))
1536}
1537
1538fn register_harn_mcp_namespace(vm: &mut Vm) {
1539 let mcp_namespace = VmValue::Dict(Rc::new(BTreeMap::from([
1540 (
1541 "_namespace".to_string(),
1542 VmValue::String(Rc::from("harn.mcp")),
1543 ),
1544 (
1545 "roots".to_string(),
1546 VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1547 ),
1548 ])));
1549 vm.set_global(
1550 "harn",
1551 VmValue::Dict(Rc::new(BTreeMap::from([
1552 ("_namespace".to_string(), VmValue::String(Rc::from("harn"))),
1553 (
1554 "mcp_roots".to_string(),
1555 VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1556 ),
1557 ("mcp".to_string(), mcp_namespace),
1558 ]))),
1559 );
1560}
1561
1562#[cfg(test)]
1563mod tests {
1564 use super::*;
1565 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1566 use tokio::net::{TcpListener, TcpStream};
1567 use tokio::sync::mpsc;
1568
1569 #[tokio::test(flavor = "current_thread")]
1570 async fn http_get_stream_dispatches_inbound_elicitation_response() {
1571 tokio::task::LocalSet::new()
1572 .run_until(async {
1573 let (base_url, mut responses) = spawn_eliciting_http_mcp_server().await;
1574 let spec = McpServerSpec {
1575 name: "mock-http".to_string(),
1576 transport: McpTransport::Http,
1577 command: String::new(),
1578 args: Vec::new(),
1579 env: BTreeMap::new(),
1580 url: format!("{base_url}/mcp"),
1581 auth_token: None,
1582 protocol_version: None,
1583 proxy_server_name: None,
1584 };
1585
1586 let handle = connect_mcp_server_from_spec(&spec).await.unwrap();
1587 let response = tokio::time::timeout(MCP_TIMEOUT, responses.recv())
1588 .await
1589 .expect("timed out waiting for elicitation response POST")
1590 .expect("mock server closed before receiving elicitation response");
1591
1592 assert_eq!(response["id"], serde_json::json!(99));
1593 assert_eq!(
1594 response["result"]["action"],
1595 serde_json::json!("decline"),
1596 "without a host bridge, inbound elicitation should decline cleanly"
1597 );
1598 handle.disconnect().await.unwrap();
1599 })
1600 .await;
1601 }
1602
1603 async fn spawn_eliciting_http_mcp_server(
1604 ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1605 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1606 let addr = listener.local_addr().unwrap();
1607 let (response_tx, response_rx) = mpsc::unbounded_channel();
1608
1609 tokio::spawn(async move {
1610 loop {
1611 let Ok((stream, _)) = listener.accept().await else {
1612 break;
1613 };
1614 let response_tx = response_tx.clone();
1615 tokio::spawn(async move {
1616 let _ = handle_mock_http_mcp_connection(stream, response_tx).await;
1617 });
1618 }
1619 });
1620
1621 (format!("http://{addr}"), response_rx)
1622 }
1623
1624 async fn spawn_recording_http_mcp_server(
1625 ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1626 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1627 let addr = listener.local_addr().unwrap();
1628 let (request_tx, request_rx) = mpsc::unbounded_channel();
1629
1630 tokio::spawn(async move {
1631 loop {
1632 let Ok((mut stream, _)) = listener.accept().await else {
1633 break;
1634 };
1635 let request_tx = request_tx.clone();
1636 tokio::spawn(async move {
1637 let Ok((_request_line, _headers, body)) = read_http_request(&mut stream).await
1638 else {
1639 return;
1640 };
1641 if let Ok(request) = serde_json::from_slice::<serde_json::Value>(&body) {
1642 let _ = request_tx.send(request);
1643 }
1644 let _ = write_http_empty(&mut stream, "202 Accepted").await;
1645 });
1646 }
1647 });
1648
1649 (format!("http://{addr}"), request_rx)
1650 }
1651
1652 async fn handle_mock_http_mcp_connection(
1653 mut stream: TcpStream,
1654 response_tx: mpsc::UnboundedSender<serde_json::Value>,
1655 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1656 let (request_line, headers, body) = read_http_request(&mut stream).await?;
1657 if request_line.starts_with("GET ") {
1658 let response = concat!(
1659 "HTTP/1.1 200 OK\r\n",
1660 "content-type: text/event-stream\r\n",
1661 "cache-control: no-cache\r\n",
1662 "\r\n",
1663 "id: prime\r\n",
1664 "data: \r\n",
1665 "\r\n",
1666 "id: elicit-1\r\n",
1667 "event: message\r\n",
1668 "data: {\"jsonrpc\":\"2.0\",\"id\":99,\"method\":\"elicitation/create\",\"params\":{\"message\":\"Need input\",\"requestedSchema\":{\"type\":\"object\",\"properties\":{}}}}\r\n",
1669 "\r\n"
1670 );
1671 stream.write_all(response.as_bytes()).await?;
1672 tokio::time::sleep(std::time::Duration::from_millis(250)).await;
1673 return Ok(());
1674 }
1675
1676 let request: serde_json::Value = serde_json::from_slice(&body)?;
1677 let method = request.get("method").and_then(|value| value.as_str());
1678 match method {
1679 Some("initialize") => {
1680 write_http_json(
1681 &mut stream,
1682 "200 OK",
1683 &[("MCP-Session-Id", "test-session")],
1684 serde_json::json!({
1685 "jsonrpc": "2.0",
1686 "id": request["id"].clone(),
1687 "result": {
1688 "protocolVersion": PROTOCOL_VERSION,
1689 "capabilities": {
1690 "elicitation": {},
1691 "tools": {}
1692 },
1693 "serverInfo": {
1694 "name": "mock",
1695 "version": "0.0.0"
1696 }
1697 }
1698 }),
1699 )
1700 .await?;
1701 }
1702 Some("notifications/initialized") => {
1703 write_http_empty(&mut stream, "202 Accepted").await?;
1704 }
1705 _ if request.get("result").is_some() || request.get("error").is_some() => {
1706 assert_eq!(
1707 headers.get("mcp-session-id").map(String::as_str),
1708 Some("test-session")
1709 );
1710 let _ = response_tx.send(request);
1711 write_http_empty(&mut stream, "202 Accepted").await?;
1712 }
1713 _ => {
1714 write_http_json(
1715 &mut stream,
1716 "200 OK",
1717 &[],
1718 serde_json::json!({
1719 "jsonrpc": "2.0",
1720 "id": request["id"].clone(),
1721 "result": {}
1722 }),
1723 )
1724 .await?;
1725 }
1726 }
1727 Ok(())
1728 }
1729
1730 async fn read_http_request(
1731 stream: &mut TcpStream,
1732 ) -> Result<(String, BTreeMap<String, String>, Vec<u8>), Box<dyn std::error::Error + Send + Sync>>
1733 {
1734 let mut buffer = Vec::new();
1735 loop {
1736 let mut chunk = [0; 1024];
1737 let bytes = stream.read(&mut chunk).await?;
1738 if bytes == 0 {
1739 break;
1740 }
1741 buffer.extend_from_slice(&chunk[..bytes]);
1742 if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
1743 break;
1744 }
1745 }
1746 let header_end = buffer
1747 .windows(4)
1748 .position(|window| window == b"\r\n\r\n")
1749 .ok_or("missing HTTP header terminator")?;
1750 let header_text = String::from_utf8(buffer[..header_end].to_vec())?;
1751 let mut lines = header_text.lines();
1752 let request_line = lines.next().unwrap_or_default().to_string();
1753 let mut headers = BTreeMap::new();
1754 for line in lines {
1755 if let Some((name, value)) = line.split_once(':') {
1756 headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
1757 }
1758 }
1759 let content_length = headers
1760 .get("content-length")
1761 .and_then(|value| value.parse::<usize>().ok())
1762 .unwrap_or(0);
1763 let mut body = buffer[header_end + 4..].to_vec();
1764 while body.len() < content_length {
1765 let mut chunk = vec![0; content_length - body.len()];
1766 let bytes = stream.read(&mut chunk).await?;
1767 if bytes == 0 {
1768 break;
1769 }
1770 body.extend_from_slice(&chunk[..bytes]);
1771 }
1772 body.truncate(content_length);
1773 Ok((request_line, headers, body))
1774 }
1775
1776 async fn write_http_json(
1777 stream: &mut TcpStream,
1778 status: &str,
1779 headers: &[(&str, &str)],
1780 body: serde_json::Value,
1781 ) -> Result<(), std::io::Error> {
1782 let body = serde_json::to_string(&body).unwrap();
1783 let mut response = format!(
1784 "HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\n",
1785 body.len()
1786 );
1787 for (name, value) in headers {
1788 response.push_str(name);
1789 response.push_str(": ");
1790 response.push_str(value);
1791 response.push_str("\r\n");
1792 }
1793 response.push_str("\r\n");
1794 response.push_str(&body);
1795 stream.write_all(response.as_bytes()).await
1796 }
1797
1798 async fn write_http_empty(stream: &mut TcpStream, status: &str) -> Result<(), std::io::Error> {
1799 let response = format!("HTTP/1.1 {status}\r\ncontent-length: 0\r\n\r\n");
1800 stream.write_all(response.as_bytes()).await
1801 }
1802
1803 #[test]
1804 fn test_vm_value_to_serde_string() {
1805 let val = VmValue::String(Rc::from("hello"));
1806 let json = vm_value_to_serde(&val);
1807 assert_eq!(json, serde_json::json!("hello"));
1808 }
1809
1810 #[test]
1811 fn test_vm_value_to_serde_dict() {
1812 let mut map = BTreeMap::new();
1813 map.insert("key".to_string(), VmValue::Int(42));
1814 let val = VmValue::Dict(Rc::new(map));
1815 let json = vm_value_to_serde(&val);
1816 assert_eq!(json, serde_json::json!({"key": 42}));
1817 }
1818
1819 #[test]
1820 fn test_vm_value_to_serde_list() {
1821 let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
1822 let json = vm_value_to_serde(&val);
1823 assert_eq!(json, serde_json::json!([1, 2]));
1824 }
1825
1826 #[test]
1827 fn test_extract_content_text_single() {
1828 let result = serde_json::json!({
1829 "content": [{"type": "text", "text": "hello world"}],
1830 "isError": false
1831 });
1832 assert_eq!(extract_content_text(&result), "hello world");
1833 }
1834
1835 #[test]
1836 fn test_extract_content_text_multiple() {
1837 let result = serde_json::json!({
1838 "content": [
1839 {"type": "text", "text": "first"},
1840 {"type": "text", "text": "second"}
1841 ],
1842 "isError": false
1843 });
1844 assert_eq!(extract_content_text(&result), "first\nsecond");
1845 }
1846
1847 #[test]
1848 fn test_extract_content_text_fallback_json() {
1849 let result = serde_json::json!({
1850 "content": [{"type": "image", "data": "abc"}],
1851 "isError": false
1852 });
1853 let output = extract_content_text(&result);
1854 assert!(output.contains("image"));
1855 }
1856
1857 #[tokio::test(flavor = "current_thread")]
1858 async fn test_parse_sse_jsonrpc_body_uses_matching_jsonrpc_response() {
1859 let inner = HttpMcpClientInner {
1860 client: reqwest::Client::new(),
1861 url: "http://127.0.0.1/mcp".to_string(),
1862 auth_token: None,
1863 protocol_version: PROTOCOL_VERSION.to_string(),
1864 session_id: None,
1865 next_id: 1,
1866 proxy_server_name: None,
1867 get_stream_task: None,
1868 };
1869 let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1870 let parsed = parse_sse_jsonrpc_body(&inner, "mock", body, Some(1))
1871 .await
1872 .unwrap();
1873 assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1874 }
1875
1876 #[test]
1877 fn client_rejects_unadvertised_server_to_client_requests() {
1878 let unknown = client_request_rejection(&serde_json::json!({
1879 "jsonrpc": "2.0",
1880 "id": "custom-1",
1881 "method": "custom/method",
1882 "params": {}
1883 }))
1884 .expect("rejection");
1885 assert_eq!(unknown["error"]["code"], serde_json::json!(-32601));
1886 assert!(unknown["error"].get("data").is_none());
1887 }
1888
1889 #[test]
1890 fn current_mcp_roots_prefers_project_root_over_child_cwd() {
1891 let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1892 let child = root.join("nested");
1893 std::fs::create_dir_all(&child).unwrap();
1894 std::fs::write(root.join("harn.toml"), "[package]\nname = \"roots\"\n").unwrap();
1895
1896 crate::stdlib::process::set_thread_execution_context(Some(
1897 crate::orchestration::RunExecutionRecord {
1898 cwd: Some(child.to_string_lossy().into_owned()),
1899 source_dir: Some(child.to_string_lossy().into_owned()),
1900 ..Default::default()
1901 },
1902 ));
1903
1904 let roots = current_mcp_roots();
1905 let expected_root = std::fs::canonicalize(&root).unwrap();
1906 assert_eq!(roots.len(), 1);
1907 assert_eq!(roots[0].path, expected_root.to_string_lossy());
1908 assert!(roots[0].uri.starts_with("file://"));
1909 assert_eq!(
1910 roots[0].name,
1911 expected_root.file_name().unwrap().to_string_lossy()
1912 );
1913
1914 crate::stdlib::process::reset_process_state();
1915 let _ = std::fs::remove_dir_all(&root);
1916 }
1917
1918 #[tokio::test(flavor = "current_thread")]
1919 async fn handle_inbound_routes_roots_list() {
1920 let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1921 std::fs::create_dir_all(&root).unwrap();
1922 crate::stdlib::process::set_thread_execution_context(Some(
1923 crate::orchestration::RunExecutionRecord {
1924 cwd: Some(root.to_string_lossy().into_owned()),
1925 ..Default::default()
1926 },
1927 ));
1928
1929 let request = serde_json::json!({
1930 "jsonrpc": "2.0",
1931 "id": "roots-1",
1932 "method": crate::mcp_protocol::METHOD_ROOTS_LIST,
1933 });
1934 let response = handle_inbound_client_request("mock", &request)
1935 .await
1936 .expect("roots/list should produce a response");
1937 let expected_root = std::fs::canonicalize(&root).unwrap();
1938 assert_eq!(response["id"], serde_json::json!("roots-1"));
1939 assert_eq!(response["result"]["roots"].as_array().unwrap().len(), 1);
1940 assert_eq!(
1941 response["result"]["roots"][0]["uri"],
1942 serde_json::json!(url::Url::from_file_path(&expected_root)
1943 .unwrap()
1944 .to_string())
1945 );
1946
1947 crate::stdlib::process::reset_process_state();
1948 let _ = std::fs::remove_dir_all(&root);
1949 }
1950
1951 #[tokio::test(flavor = "current_thread")]
1952 async fn roots_list_changed_notification_is_sent_once_per_snapshot() {
1953 tokio::task::LocalSet::new()
1954 .run_until(async {
1955 let (base_url, mut requests) = spawn_recording_http_mcp_server().await;
1956 let handle = VmMcpClientHandle {
1957 name: "mock-http".to_string(),
1958 inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
1959 client: reqwest::Client::new(),
1960 url: format!("{base_url}/mcp"),
1961 auth_token: None,
1962 protocol_version: PROTOCOL_VERSION.to_string(),
1963 session_id: None,
1964 next_id: 1,
1965 proxy_server_name: None,
1966 get_stream_task: None,
1967 })))),
1968 last_roots: Arc::new(Mutex::new(Vec::new())),
1969 initialize_result: Arc::new(Mutex::new(None)),
1970 };
1971
1972 handle.notify_roots_list_changed_if_needed().await.unwrap();
1973 let notification = tokio::time::timeout(MCP_TIMEOUT, requests.recv())
1974 .await
1975 .expect("timed out waiting for roots notification")
1976 .expect("mock server closed before notification");
1977 assert_eq!(
1978 notification["method"],
1979 serde_json::json!(crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION)
1980 );
1981
1982 handle.notify_roots_list_changed_if_needed().await.unwrap();
1983 assert!(
1984 tokio::time::timeout(std::time::Duration::from_millis(50), requests.recv())
1985 .await
1986 .is_err(),
1987 "unchanged roots should not send another notification"
1988 );
1989 })
1990 .await;
1991 }
1992
1993 #[tokio::test(flavor = "current_thread")]
1994 async fn handle_inbound_routes_sampling_to_dispatcher() {
1995 let request = serde_json::json!({
2002 "jsonrpc": "2.0",
2003 "id": 42,
2004 "method": crate::mcp_sampling::SAMPLING_METHOD,
2005 "params": {
2006 "messages": [
2007 {"role": "user", "content": {"type": "text", "text": "ping"}}
2008 ],
2009 "maxTokens": 4,
2010 },
2011 });
2012 let response = handle_inbound_client_request("mock", &request)
2013 .await
2014 .expect("sampling should produce a response");
2015 assert_eq!(response["id"], serde_json::json!(42));
2016 assert_eq!(
2017 response["error"]["data"]["type"],
2018 serde_json::json!("mcp.samplingDeclined")
2019 );
2020 }
2021}