1use std::collections::BTreeMap;
6use std::path::{Path, PathBuf};
7use std::rc::Rc;
8use std::sync::Arc;
9
10use futures::StreamExt;
11use reqwest_eventsource::{Event as SseEvent, EventSource};
12use serde::Deserialize;
13use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
14use tokio::process::{Child, ChildStdin, ChildStdout};
15use tokio::sync::Mutex;
16
17use crate::stdlib::json_to_vm_value;
18use crate::value::{VmError, VmValue};
19use crate::vm::Vm;
20
21const PROTOCOL_VERSION: &str = "2025-11-25";
23
24const MCP_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
26
27#[derive(Clone, Debug, Deserialize)]
28#[serde(rename_all = "lowercase")]
29enum McpTransport {
30 Stdio,
31 Http,
32}
33
34#[derive(Clone, Debug, Deserialize)]
35pub struct McpServerSpec {
36 pub name: String,
37 #[serde(default = "default_transport")]
38 transport: McpTransport,
39 #[serde(default)]
40 pub command: String,
41 #[serde(default)]
42 pub args: Vec<String>,
43 #[serde(default)]
44 pub env: BTreeMap<String, String>,
45 #[serde(default)]
46 pub url: String,
47 #[serde(default)]
48 pub auth_token: Option<String>,
49 #[serde(default)]
50 pub protocol_version: Option<String>,
51 #[serde(default)]
52 pub proxy_server_name: Option<String>,
53}
54
55fn default_transport() -> McpTransport {
56 McpTransport::Stdio
57}
58
59enum McpClientInner {
61 Stdio(StdioMcpClientInner),
62 Http(HttpMcpClientInner),
63}
64
65struct StdioMcpClientInner {
66 child: Child,
67 stdin: ChildStdin,
68 reader: BufReader<ChildStdout>,
69 next_id: u64,
70}
71
72struct HttpMcpClientInner {
73 client: reqwest::Client,
74 url: String,
75 auth_token: Option<String>,
76 protocol_version: String,
77 session_id: Option<String>,
78 next_id: u64,
79 proxy_server_name: Option<String>,
80 get_stream_task: Option<tokio::task::JoinHandle<()>>,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct McpRoot {
85 path: String,
86 uri: String,
87 name: String,
88}
89
90impl McpRoot {
91 fn protocol_json(&self) -> serde_json::Value {
92 serde_json::json!({
93 "uri": self.uri,
94 "name": self.name,
95 })
96 }
97
98 fn script_json(&self) -> serde_json::Value {
99 serde_json::json!({
100 "uri": self.uri,
101 "name": self.name,
102 "path": self.path,
103 })
104 }
105}
106
107impl HttpMcpClientInner {
108 fn abort_get_stream(&mut self) {
109 if let Some(task) = self.get_stream_task.take() {
110 task.abort();
111 }
112 }
113}
114
115impl Drop for StdioMcpClientInner {
116 fn drop(&mut self) {
117 let _ = self.child.start_kill();
118 }
119}
120
121impl Drop for HttpMcpClientInner {
122 fn drop(&mut self) {
123 self.abort_get_stream();
124 }
125}
126
127#[derive(Clone)]
129pub struct VmMcpClientHandle {
130 pub name: String,
131 inner: Arc<Mutex<Option<McpClientInner>>>,
132 last_roots: Arc<Mutex<Vec<McpRoot>>>,
133 pub(crate) initialize_result: Arc<Mutex<Option<serde_json::Value>>>,
134}
135
136impl std::fmt::Debug for VmMcpClientHandle {
137 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
138 write!(f, "McpClient({})", self.name)
139 }
140}
141
142impl VmMcpClientHandle {
143 pub(crate) async fn call(
144 &self,
145 method: &str,
146 params: serde_json::Value,
147 ) -> Result<serde_json::Value, VmError> {
148 if method != "initialize" {
149 self.notify_roots_list_changed_if_needed().await?;
150 }
151 let mut guard = self.inner.lock().await;
152 let inner = guard
153 .as_mut()
154 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
155
156 match inner {
157 McpClientInner::Stdio(inner) => stdio_call(inner, &self.name, method, params).await,
158 McpClientInner::Http(inner) => http_call(inner, &self.name, method, params).await,
159 }
160 }
161
162 async fn notify(&self, method: &str, params: serde_json::Value) -> Result<(), VmError> {
163 let mut guard = self.inner.lock().await;
164 let inner = guard
165 .as_mut()
166 .ok_or_else(|| VmError::Runtime("MCP client is disconnected".into()))?;
167
168 match inner {
169 McpClientInner::Stdio(inner) => stdio_notify(inner, method, params).await,
170 McpClientInner::Http(inner) => http_notify(inner, &self.name, method, params).await,
171 }
172 }
173
174 pub(crate) async fn disconnect(&self) -> Result<(), VmError> {
175 let mut guard = self.inner.lock().await;
176 if let Some(inner) = guard.take() {
177 match inner {
178 McpClientInner::Stdio(mut inner) => {
179 let _ = inner.child.kill().await;
180 }
181 McpClientInner::Http(mut inner) => {
182 inner.abort_get_stream();
183 }
184 }
185 }
186 Ok(())
187 }
188
189 async fn notify_roots_list_changed_if_needed(&self) -> Result<(), VmError> {
190 let roots = current_mcp_roots();
191 let mut last_roots = self.last_roots.lock().await;
192 if *last_roots == roots {
193 return Ok(());
194 }
195
196 self.notify(
197 crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION,
198 serde_json::json!({}),
199 )
200 .await?;
201 *last_roots = roots;
202 Ok(())
203 }
204}
205
206async fn stdio_call(
207 inner: &mut StdioMcpClientInner,
208 server_name: &str,
209 method: &str,
210 params: serde_json::Value,
211) -> Result<serde_json::Value, VmError> {
212 let id = inner.next_id;
213 inner.next_id += 1;
214
215 let request = serde_json::json!({
216 "jsonrpc": "2.0",
217 "id": id,
218 "method": method,
219 "params": params,
220 });
221
222 let line = serde_json::to_string(&request)
223 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
224 inner
225 .stdin
226 .write_all(line.as_bytes())
227 .await
228 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
229 inner
230 .stdin
231 .write_all(b"\n")
232 .await
233 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
234 inner
235 .stdin
236 .flush()
237 .await
238 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
239
240 let mut line_buf = String::new();
241 loop {
242 line_buf.clear();
243 let bytes_read = tokio::time::timeout(MCP_TIMEOUT, inner.reader.read_line(&mut line_buf))
244 .await
245 .map_err(|_| {
246 VmError::Runtime(format!(
247 "MCP: server did not respond to '{method}' within {}s",
248 MCP_TIMEOUT.as_secs()
249 ))
250 })?
251 .map_err(|e| VmError::Runtime(format!("MCP read error: {e}")))?;
252
253 if bytes_read == 0 {
254 return Err(VmError::Runtime("MCP: server closed connection".into()));
255 }
256
257 let trimmed = line_buf.trim();
258 if trimmed.is_empty() {
259 continue;
260 }
261
262 let msg: serde_json::Value = match serde_json::from_str(trimmed) {
263 Ok(v) => v,
264 Err(_) => continue,
265 };
266
267 if msg.get("id").is_none() {
268 continue;
269 }
270
271 if msg["id"].as_u64() == Some(id)
272 && (msg.get("result").is_some() || msg.get("error").is_some())
273 {
274 return parse_jsonrpc_result(msg);
275 }
276
277 let response = match handle_inbound_client_request(server_name, &msg).await {
278 Some(response) => response,
279 None => continue,
280 };
281 let line = serde_json::to_string(&response)
282 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
283 inner
284 .stdin
285 .write_all(line.as_bytes())
286 .await
287 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
288 inner
289 .stdin
290 .write_all(b"\n")
291 .await
292 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
293 inner
294 .stdin
295 .flush()
296 .await
297 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
298 }
299}
300
301async fn handle_inbound_client_request(
306 server_name: &str,
307 msg: &serde_json::Value,
308) -> Option<serde_json::Value> {
309 let method = msg.get("method").and_then(|value| value.as_str())?;
310 if method == crate::mcp_elicit::ELICITATION_METHOD {
311 return Some(crate::mcp_elicit::dispatch_inbound_elicitation(server_name, msg).await);
312 }
313 if method == crate::mcp_sampling::SAMPLING_METHOD {
314 return Some(crate::mcp_sampling::dispatch_inbound_sampling(server_name, msg).await);
315 }
316 if method == crate::mcp_protocol::METHOD_ROOTS_LIST {
317 let id = msg.get("id")?.clone();
318 return Some(harn_roots_list_response(id));
319 }
320 client_request_rejection(msg)
321}
322
323async fn stdio_notify(
324 inner: &mut StdioMcpClientInner,
325 method: &str,
326 params: serde_json::Value,
327) -> Result<(), VmError> {
328 let notification = serde_json::json!({
329 "jsonrpc": "2.0",
330 "method": method,
331 "params": params,
332 });
333
334 let line = serde_json::to_string(¬ification)
335 .map_err(|e| VmError::Runtime(format!("MCP serialization error: {e}")))?;
336 inner
337 .stdin
338 .write_all(line.as_bytes())
339 .await
340 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
341 inner
342 .stdin
343 .write_all(b"\n")
344 .await
345 .map_err(|e| VmError::Runtime(format!("MCP write error: {e}")))?;
346 inner
347 .stdin
348 .flush()
349 .await
350 .map_err(|e| VmError::Runtime(format!("MCP flush error: {e}")))?;
351 Ok(())
352}
353
354async fn http_call(
355 inner: &mut HttpMcpClientInner,
356 server_name: &str,
357 method: &str,
358 params: serde_json::Value,
359) -> Result<serde_json::Value, VmError> {
360 let id = inner.next_id;
361 inner.next_id += 1;
362 send_http_request(inner, server_name, method, params, Some(id)).await
363}
364
365async fn http_notify(
366 inner: &mut HttpMcpClientInner,
367 server_name: &str,
368 method: &str,
369 params: serde_json::Value,
370) -> Result<(), VmError> {
371 let _ = send_http_request(inner, server_name, method, params, None).await?;
372 Ok(())
373}
374
375async fn send_http_request(
376 inner: &mut HttpMcpClientInner,
377 server_name: &str,
378 method: &str,
379 params: serde_json::Value,
380 id: Option<u64>,
381) -> Result<serde_json::Value, VmError> {
382 for attempt in 0..2 {
383 let response = send_http_request_once(inner, method, params.clone(), id).await?;
384
385 let status = response.status().as_u16();
386 let headers = response.headers().clone();
387 if let Some(protocol_version) = headers
388 .get("MCP-Protocol-Version")
389 .and_then(|v| v.to_str().ok())
390 {
391 inner.protocol_version = protocol_version.to_string();
392 }
393 if let Some(session_id) = headers.get("MCP-Session-Id").and_then(|v| v.to_str().ok()) {
394 inner.session_id = Some(session_id.to_string());
395 }
396
397 if status == 404 && inner.session_id.is_some() && method != "initialize" && attempt == 0 {
398 inner.session_id = None;
399 inner.abort_get_stream();
400 reinitialize_http_client(inner).await?;
401 continue;
402 }
403
404 if status == 401 {
405 return Err(VmError::Thrown(VmValue::String(Rc::from(
406 "MCP authorization required",
407 ))));
408 }
409
410 let body = response
411 .text()
412 .await
413 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
414
415 if body.trim().is_empty() {
416 if status < 400 {
417 ensure_http_get_stream(inner, server_name);
418 }
419 return Ok(serde_json::Value::Null);
420 }
421
422 let msg = parse_http_response_body(inner, server_name, &body, status, id).await?;
423
424 if status >= 400 {
425 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
426 }
427
428 ensure_http_get_stream(inner, server_name);
429 if id.is_none() {
430 return Ok(msg);
431 }
432 return parse_jsonrpc_result(msg);
433 }
434
435 Err(VmError::Runtime("MCP HTTP request failed".into()))
436}
437
438async fn send_http_request_once(
439 inner: &mut HttpMcpClientInner,
440 method: &str,
441 params: serde_json::Value,
442 id: Option<u64>,
443) -> Result<reqwest::Response, VmError> {
444 let mut payload = serde_json::json!({
445 "jsonrpc": "2.0",
446 "method": method,
447 "params": params,
448 });
449 if let Some(id) = id {
450 payload["id"] = serde_json::json!(id);
451 }
452 let payload = wrap_http_payload(payload, inner.proxy_server_name.as_deref());
453
454 let request = inner
455 .client
456 .post(&inner.url)
457 .header("Content-Type", "application/json")
458 .header("Accept", "application/json, text/event-stream")
459 .json(&payload);
460 let request = apply_http_headers(
461 request,
462 &inner.auth_token,
463 &inner.protocol_version,
464 inner.session_id.as_deref(),
465 );
466
467 request
468 .timeout(MCP_TIMEOUT)
469 .send()
470 .await
471 .map_err(|e| VmError::Runtime(format!("MCP HTTP request error: {e}")))
472}
473
474fn ensure_http_get_stream(inner: &mut HttpMcpClientInner, server_name: &str) {
475 if server_name.is_empty() {
476 return;
477 }
478 if inner
479 .get_stream_task
480 .as_ref()
481 .is_some_and(|task| !task.is_finished())
482 {
483 return;
484 }
485
486 let config = HttpStreamConfig {
487 client: inner.client.clone(),
488 url: inner.url.clone(),
489 auth_token: inner.auth_token.clone(),
490 protocol_version: inner.protocol_version.clone(),
491 session_id: inner.session_id.clone(),
492 proxy_server_name: inner.proxy_server_name.clone(),
493 server_name: server_name.to_string(),
494 };
495 inner.get_stream_task = Some(tokio::task::spawn_local(run_http_get_stream(config)));
496}
497
498#[derive(Clone)]
499struct HttpStreamConfig {
500 client: reqwest::Client,
501 url: String,
502 auth_token: Option<String>,
503 protocol_version: String,
504 session_id: Option<String>,
505 proxy_server_name: Option<String>,
506 server_name: String,
507}
508
509async fn run_http_get_stream(config: HttpStreamConfig) {
510 let request = apply_http_headers(
511 config
512 .client
513 .get(&config.url)
514 .header("Accept", "text/event-stream"),
515 &config.auth_token,
516 &config.protocol_version,
517 config.session_id.as_deref(),
518 );
519 let Ok(mut stream) = EventSource::new(request) else {
520 return;
521 };
522
523 while let Some(event) = stream.next().await {
524 match event {
525 Ok(SseEvent::Open) => {}
526 Ok(SseEvent::Message(message)) => {
527 if message.data.trim().is_empty() {
528 continue;
529 }
530 let Ok(msg) = serde_json::from_str::<serde_json::Value>(&message.data) else {
531 tracing::debug!("MCP HTTP GET stream received non-JSON event");
532 continue;
533 };
534 if let Some(response) =
535 handle_inbound_client_request(&config.server_name, &msg).await
536 {
537 let _ = post_http_jsonrpc_payload(&config, response).await;
538 }
539 }
540 Err(error) => {
541 tracing::debug!("MCP HTTP GET stream ended with error: {error}");
542 break;
543 }
544 }
545 }
546 stream.close();
547}
548
549async fn post_http_jsonrpc_payload(
550 config: &HttpStreamConfig,
551 payload: serde_json::Value,
552) -> Result<(), VmError> {
553 let payload = wrap_http_payload(payload, config.proxy_server_name.as_deref());
554 let request = config
555 .client
556 .post(&config.url)
557 .header("Content-Type", "application/json")
558 .header("Accept", "application/json, text/event-stream")
559 .json(&payload)
560 .timeout(MCP_TIMEOUT);
561 let request = apply_http_headers(
562 request,
563 &config.auth_token,
564 &config.protocol_version,
565 config.session_id.as_deref(),
566 );
567 let response = request
568 .send()
569 .await
570 .map_err(|e| VmError::Runtime(format!("MCP HTTP response POST error: {e}")))?;
571 if response.status().is_success() {
572 Ok(())
573 } else {
574 Err(VmError::Runtime(format!(
575 "MCP HTTP response POST returned {}",
576 response.status()
577 )))
578 }
579}
580
581fn apply_http_headers(
582 mut request: reqwest::RequestBuilder,
583 auth_token: &Option<String>,
584 protocol_version: &str,
585 session_id: Option<&str>,
586) -> reqwest::RequestBuilder {
587 request = request.header("MCP-Protocol-Version", protocol_version);
588 if let Some(token) = auth_token {
589 request = request.header("Authorization", format!("Bearer {token}"));
590 }
591 if let Some(session_id) = session_id {
592 request = request.header("MCP-Session-Id", session_id);
593 }
594 request
595}
596
597fn wrap_http_payload(
598 payload: serde_json::Value,
599 proxy_server_name: Option<&str>,
600) -> serde_json::Value {
601 let Some(proxy_server_name) = proxy_server_name else {
602 return payload;
603 };
604 let mut wrapped = serde_json::Map::new();
605 wrapped.insert(
606 "serverName".to_string(),
607 serde_json::Value::String(proxy_server_name.to_string()),
608 );
609 if let Some(object) = payload.as_object() {
610 for (key, value) in object {
611 wrapped.insert(key.clone(), value.clone());
612 }
613 }
614 serde_json::Value::Object(wrapped)
615}
616
617async fn reinitialize_http_client(inner: &mut HttpMcpClientInner) -> Result<(), VmError> {
618 let initialize = send_http_request_once(
619 inner,
620 "initialize",
621 serde_json::json!({
622 "protocolVersion": PROTOCOL_VERSION,
623 "capabilities": {
624 "elicitation": {},
625 "roots": {
626 "listChanged": true,
627 },
628 "sampling": {},
629 },
630 "clientInfo": {
631 "name": "harn",
632 "version": env!("CARGO_PKG_VERSION"),
633 }
634 }),
635 Some(0),
636 )
637 .await?;
638 if let Some(protocol_version) = initialize
639 .headers()
640 .get("MCP-Protocol-Version")
641 .and_then(|v| v.to_str().ok())
642 {
643 inner.protocol_version = protocol_version.to_string();
644 }
645 if let Some(session_id) = initialize
646 .headers()
647 .get("MCP-Session-Id")
648 .and_then(|v| v.to_str().ok())
649 {
650 inner.session_id = Some(session_id.to_string());
651 }
652 let status = initialize.status().as_u16();
653 let body = initialize
654 .text()
655 .await
656 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
657 let msg = parse_http_response_body(inner, "", &body, status, Some(0)).await?;
658 if status >= 400 {
659 return Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)));
660 }
661 let _ = parse_jsonrpc_result(msg)?;
662 let response = send_http_request_once(
663 inner,
664 "notifications/initialized",
665 serde_json::json!({}),
666 None,
667 )
668 .await?;
669 let status = response.status().as_u16();
670 if let Some(protocol_version) = response
671 .headers()
672 .get("MCP-Protocol-Version")
673 .and_then(|v| v.to_str().ok())
674 {
675 inner.protocol_version = protocol_version.to_string();
676 }
677 if let Some(session_id) = response
678 .headers()
679 .get("MCP-Session-Id")
680 .and_then(|v| v.to_str().ok())
681 {
682 inner.session_id = Some(session_id.to_string());
683 }
684 let body = response
685 .text()
686 .await
687 .map_err(|e| VmError::Runtime(format!("MCP HTTP read error: {e}")))?;
688 if body.trim().is_empty() || status < 400 {
689 return Ok(());
690 }
691 let msg = parse_http_response_body(inner, "", &body, status, None).await?;
692 Err(jsonrpc_error_to_vm_error(msg.get("error").unwrap_or(&msg)))
693}
694
695async fn parse_http_response_body(
696 inner: &HttpMcpClientInner,
697 server_name: &str,
698 body: &str,
699 status: u16,
700 request_id: Option<u64>,
701) -> Result<serde_json::Value, VmError> {
702 if body.trim_start().starts_with("event:") || body.trim_start().starts_with("data:") {
703 return parse_sse_jsonrpc_body(inner, server_name, body, request_id).await;
704 }
705 serde_json::from_str(body).map_err(|e| {
706 VmError::Runtime(format!(
707 "MCP HTTP response parse error (status {status}): {e}"
708 ))
709 })
710}
711
712async fn parse_sse_jsonrpc_body(
713 inner: &HttpMcpClientInner,
714 server_name: &str,
715 body: &str,
716 request_id: Option<u64>,
717) -> Result<serde_json::Value, VmError> {
718 let mut current_data = Vec::new();
719 let mut messages = Vec::new();
720
721 for line in body.lines() {
722 if line.is_empty() {
723 if !current_data.is_empty() {
724 messages.push(current_data.join("\n"));
725 current_data.clear();
726 }
727 continue;
728 }
729 if let Some(data) = line.strip_prefix("data:") {
730 current_data.push(data.trim_start().to_string());
731 }
732 }
733 if !current_data.is_empty() {
734 messages.push(current_data.join("\n"));
735 }
736
737 let config = HttpStreamConfig {
738 client: inner.client.clone(),
739 url: inner.url.clone(),
740 auth_token: inner.auth_token.clone(),
741 protocol_version: inner.protocol_version.clone(),
742 session_id: inner.session_id.clone(),
743 proxy_server_name: inner.proxy_server_name.clone(),
744 server_name: server_name.to_string(),
745 };
746
747 let mut fallback = None;
748 for message in messages {
749 if let Ok(value) = serde_json::from_str::<serde_json::Value>(&message) {
750 if request_id.is_some()
751 && value["id"].as_u64() == request_id
752 && (value.get("result").is_some() || value.get("error").is_some())
753 {
754 return Ok(value);
755 }
756 if let Some(response) = handle_inbound_client_request(server_name, &value).await {
757 let _ = post_http_jsonrpc_payload(&config, response).await;
758 continue;
759 }
760 if value.get("result").is_some() || value.get("error").is_some() {
761 fallback = Some(value);
762 }
763 }
764 }
765
766 fallback.ok_or_else(|| {
767 VmError::Runtime(
768 "MCP HTTP response parse error: no JSON-RPC payload found in SSE stream".into(),
769 )
770 })
771}
772
773fn parse_jsonrpc_result(msg: serde_json::Value) -> Result<serde_json::Value, VmError> {
774 if let Some(error) = msg.get("error") {
775 return Err(jsonrpc_error_to_vm_error(error));
776 }
777 Ok(msg
778 .get("result")
779 .cloned()
780 .unwrap_or(serde_json::Value::Null))
781}
782
783fn jsonrpc_error_to_vm_error(error: &serde_json::Value) -> VmError {
784 let message = error
785 .get("message")
786 .and_then(|v| v.as_str())
787 .unwrap_or("Unknown MCP error");
788 let code = error.get("code").and_then(|v| v.as_i64()).unwrap_or(-1);
789 VmError::Thrown(VmValue::String(Rc::from(format!(
790 "MCP error ({code}): {message}"
791 ))))
792}
793
794fn client_request_rejection(msg: &serde_json::Value) -> Option<serde_json::Value> {
795 let request_id = msg.get("id")?.clone();
796 let method = msg.get("method").and_then(|value| value.as_str())?;
797 crate::mcp_protocol::unsupported_latest_spec_method_response(request_id.clone(), method)
798 .or_else(|| {
799 Some(crate::jsonrpc::error_response(
800 request_id,
801 -32601,
802 &format!("Method not found: {method}"),
803 ))
804 })
805}
806
807fn harn_roots_list_response(id: serde_json::Value) -> serde_json::Value {
808 crate::jsonrpc::response(
809 id,
810 serde_json::json!({
811 "roots": current_mcp_roots()
812 .iter()
813 .map(McpRoot::protocol_json)
814 .collect::<Vec<_>>()
815 }),
816 )
817}
818
819pub(crate) fn current_mcp_roots() -> Vec<McpRoot> {
820 compact_root_paths(current_mcp_root_candidates())
821 .into_iter()
822 .filter_map(|path| {
823 let uri = url::Url::from_file_path(&path).ok()?.to_string();
824 Some(McpRoot {
825 name: root_display_name(&path),
826 path: path.to_string_lossy().into_owned(),
827 uri,
828 })
829 })
830 .collect()
831}
832
833fn current_mcp_root_candidates() -> Vec<PathBuf> {
834 let mut candidates = Vec::new();
835 if let Some(context) = crate::stdlib::process::current_execution_context() {
836 if let Some(path) = non_empty_path(context.worktree_path.as_deref()) {
837 candidates.push(path);
838 }
839 if let Some(cwd) = non_empty_path(context.cwd.as_deref()) {
840 push_project_root_or_path(&mut candidates, cwd);
841 }
842 if let Some(source_dir) = non_empty_path(context.source_dir.as_deref()) {
843 push_project_root_or_path(&mut candidates, source_dir);
844 }
845 } else {
846 push_project_root_or_path(
847 &mut candidates,
848 crate::stdlib::process::execution_root_path(),
849 );
850 push_project_root_or_path(&mut candidates, crate::stdlib::process::source_root_path());
851 }
852
853 if candidates.is_empty() {
854 candidates.push(crate::stdlib::process::execution_root_path());
855 }
856 candidates
857}
858
859fn non_empty_path(raw: Option<&str>) -> Option<PathBuf> {
860 raw.filter(|path| !path.trim().is_empty())
861 .map(PathBuf::from)
862}
863
864fn push_project_root_or_path(candidates: &mut Vec<PathBuf>, path: PathBuf) {
865 let normalized = crate::stdlib::process::normalize_context_path(&path);
866 match crate::stdlib::process::find_project_root(&normalized) {
867 Some(root) => candidates.push(root),
868 None => candidates.push(normalized),
869 }
870}
871
872fn compact_root_paths(paths: Vec<PathBuf>) -> Vec<PathBuf> {
873 let mut normalized = paths
874 .into_iter()
875 .map(normalize_root_path)
876 .collect::<Vec<_>>();
877 normalized.sort_by_key(|path| {
878 (
879 path.components().count(),
880 path.to_string_lossy().to_string(),
881 )
882 });
883
884 let mut roots: Vec<PathBuf> = Vec::new();
885 for path in normalized {
886 if roots
887 .iter()
888 .any(|existing| path == *existing || path.starts_with(existing))
889 {
890 continue;
891 }
892 roots.push(path);
893 }
894 roots
895}
896
897fn normalize_root_path(path: PathBuf) -> PathBuf {
898 let absolute = crate::stdlib::process::normalize_context_path(&path);
899 std::fs::canonicalize(&absolute).unwrap_or(absolute)
900}
901
902fn root_display_name(path: &Path) -> String {
903 path.file_name()
904 .and_then(|name| name.to_str())
905 .filter(|name| !name.is_empty())
906 .map(str::to_string)
907 .unwrap_or_else(|| path.display().to_string())
908}
909
910async fn mcp_connect_stdio_impl(
911 command: &str,
912 args: &[String],
913 env: &BTreeMap<String, String>,
914) -> Result<VmMcpClientHandle, VmError> {
915 let mut cmd = tokio::process::Command::new(command);
916 cmd.args(args)
917 .stdin(std::process::Stdio::piped())
918 .stdout(std::process::Stdio::piped())
919 .stderr(std::process::Stdio::inherit())
920 .envs(env);
921 cmd.kill_on_drop(true);
922
923 let mut child = cmd.spawn().map_err(|e| {
924 VmError::Thrown(VmValue::String(Rc::from(format!(
925 "mcp_connect: failed to spawn '{command}': {e}"
926 ))))
927 })?;
928
929 let stdin = child
930 .stdin
931 .take()
932 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdin".into()))?;
933 let stdout = child
934 .stdout
935 .take()
936 .ok_or_else(|| VmError::Runtime("mcp_connect: failed to open stdout".into()))?;
937
938 let handle = VmMcpClientHandle {
939 name: command.to_string(),
940 inner: Arc::new(Mutex::new(Some(McpClientInner::Stdio(
941 StdioMcpClientInner {
942 child,
943 stdin,
944 reader: BufReader::new(stdout),
945 next_id: 1,
946 },
947 )))),
948 last_roots: Arc::new(Mutex::new(Vec::new())),
949 initialize_result: Arc::new(Mutex::new(None)),
950 };
951
952 initialize_client(&handle).await?;
953 Ok(handle)
954}
955
956async fn mcp_connect_http_impl(spec: &McpServerSpec) -> Result<VmMcpClientHandle, VmError> {
957 let client = reqwest::Client::builder()
958 .build()
959 .map_err(|e| VmError::Runtime(format!("MCP HTTP client error: {e}")))?;
960
961 let handle = VmMcpClientHandle {
962 name: spec.name.clone(),
963 inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
964 client,
965 url: spec.url.clone(),
966 auth_token: spec.auth_token.clone(),
967 protocol_version: spec
968 .protocol_version
969 .clone()
970 .unwrap_or_else(|| PROTOCOL_VERSION.to_string()),
971 session_id: None,
972 next_id: 1,
973 proxy_server_name: spec.proxy_server_name.clone(),
974 get_stream_task: None,
975 })))),
976 last_roots: Arc::new(Mutex::new(Vec::new())),
977 initialize_result: Arc::new(Mutex::new(None)),
978 };
979
980 initialize_client(&handle).await?;
981 Ok(handle)
982}
983
984async fn initialize_client(handle: &VmMcpClientHandle) -> Result<(), VmError> {
985 let initialize_result = handle
986 .call(
987 "initialize",
988 serde_json::json!({
989 "protocolVersion": PROTOCOL_VERSION,
990 "capabilities": {
991 "elicitation": {},
992 "roots": {
993 "listChanged": true,
994 },
995 "sampling": {},
996 },
997 "clientInfo": {
998 "name": "harn",
999 "version": env!("CARGO_PKG_VERSION"),
1000 }
1001 }),
1002 )
1003 .await?;
1004 *handle.initialize_result.lock().await = Some(initialize_result);
1005
1006 handle
1007 .notify("notifications/initialized", serde_json::json!({}))
1008 .await?;
1009
1010 Ok(())
1011}
1012
1013pub(crate) fn vm_value_to_serde(val: &VmValue) -> serde_json::Value {
1014 match val {
1015 VmValue::String(s) => serde_json::Value::String(s.to_string()),
1016 VmValue::Int(n) => serde_json::json!(*n),
1017 VmValue::Float(n) => serde_json::json!(*n),
1018 VmValue::Bool(b) => serde_json::Value::Bool(*b),
1019 VmValue::Nil => serde_json::Value::Null,
1020 VmValue::List(items) => {
1021 serde_json::Value::Array(items.iter().map(vm_value_to_serde).collect())
1022 }
1023 VmValue::Dict(map) => {
1024 let obj: serde_json::Map<String, serde_json::Value> = map
1025 .iter()
1026 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1027 .collect();
1028 serde_json::Value::Object(obj)
1029 }
1030 _ => serde_json::Value::Null,
1031 }
1032}
1033
1034fn extract_content_text(result: &serde_json::Value) -> String {
1035 if let Some(content) = result.get("content").and_then(|c| c.as_array()) {
1036 let texts: Vec<&str> = content
1037 .iter()
1038 .filter_map(|item| {
1039 if item.get("type").and_then(|t| t.as_str()) == Some("text") {
1040 item.get("text").and_then(|t| t.as_str())
1041 } else {
1042 None
1043 }
1044 })
1045 .collect();
1046 if texts.is_empty() {
1047 json_to_vm_value(result).display()
1048 } else {
1049 texts.join("\n")
1050 }
1051 } else {
1052 json_to_vm_value(result).display()
1053 }
1054}
1055
1056pub(crate) async fn call_mcp_tool(
1057 client: &VmMcpClientHandle,
1058 tool_name: &str,
1059 arguments: serde_json::Value,
1060) -> Result<serde_json::Value, VmError> {
1061 let result = client
1062 .call(
1063 "tools/call",
1064 serde_json::json!({
1065 "name": tool_name,
1066 "arguments": arguments,
1067 }),
1068 )
1069 .await?;
1070
1071 if result.get("isError").and_then(|v| v.as_bool()) == Some(true) {
1072 let error_text = extract_content_text(&result);
1073 return Err(VmError::Thrown(VmValue::String(Rc::from(error_text))));
1074 }
1075
1076 let content = result
1077 .get("content")
1078 .and_then(|c| c.as_array())
1079 .cloned()
1080 .unwrap_or_default();
1081
1082 if content.len() == 1 && content[0].get("type").and_then(|t| t.as_str()) == Some("text") {
1083 if let Some(text) = content[0].get("text").and_then(|t| t.as_str()) {
1084 return Ok(serde_json::Value::String(text.to_string()));
1085 }
1086 }
1087
1088 if content.is_empty() {
1089 Ok(serde_json::Value::Null)
1090 } else {
1091 Ok(serde_json::Value::Array(content))
1092 }
1093}
1094
1095pub async fn connect_mcp_server(
1096 name: &str,
1097 command: &str,
1098 args: &[String],
1099) -> Result<VmMcpClientHandle, VmError> {
1100 let mut handle = mcp_connect_stdio_impl(command, args, &BTreeMap::new()).await?;
1101 handle.name = name.to_string();
1102 Ok(handle)
1103}
1104
1105pub async fn connect_mcp_server_from_spec(
1106 spec: &McpServerSpec,
1107) -> Result<VmMcpClientHandle, VmError> {
1108 let mut handle = match spec.transport {
1109 McpTransport::Stdio => mcp_connect_stdio_impl(&spec.command, &spec.args, &spec.env).await?,
1110 McpTransport::Http => mcp_connect_http_impl(spec).await?,
1111 };
1112 handle.name = spec.name.clone();
1113 Ok(handle)
1114}
1115
1116pub async fn connect_mcp_server_from_json(
1117 value: &serde_json::Value,
1118) -> Result<VmMcpClientHandle, VmError> {
1119 let spec: McpServerSpec = serde_json::from_value(value.clone())
1120 .map_err(|e| VmError::Runtime(format!("Invalid MCP server config: {e}")))?;
1121 connect_mcp_server_from_spec(&spec).await
1122}
1123
1124pub fn register_mcp_builtins(vm: &mut Vm) {
1125 vm.register_builtin("mcp_roots", mcp_roots_builtin);
1126 vm.register_builtin("harn.mcp.roots", mcp_roots_builtin);
1127 register_harn_mcp_namespace(vm);
1128
1129 vm.register_async_builtin("mcp_connect", |args| async move {
1130 let command = args.first().map(|a| a.display()).unwrap_or_default();
1131 if command.is_empty() {
1132 return Err(VmError::Thrown(VmValue::String(Rc::from(
1133 "mcp_connect: command is required",
1134 ))));
1135 }
1136
1137 let cmd_args: Vec<String> = match args.get(1) {
1138 Some(VmValue::List(list)) => list.iter().map(|v| v.display()).collect(),
1139 _ => Vec::new(),
1140 };
1141
1142 let handle = mcp_connect_stdio_impl(&command, &cmd_args, &BTreeMap::new()).await?;
1143 Ok(VmValue::McpClient(handle))
1144 });
1145
1146 vm.register_async_builtin("mcp_ensure_active", |args| async move {
1150 let name = match args.first() {
1151 Some(VmValue::String(s)) => s.to_string(),
1152 Some(other) => other.display(),
1153 None => String::new(),
1154 };
1155 if name.is_empty() {
1156 return Err(VmError::Thrown(VmValue::String(Rc::from(
1157 "mcp_ensure_active: server name is required",
1158 ))));
1159 }
1160 let handle = crate::mcp_registry::ensure_active(&name).await?;
1161 Ok(VmValue::McpClient(handle))
1162 });
1163
1164 vm.register_builtin("mcp_release", |args, _out| {
1168 let name = match args.first() {
1169 Some(VmValue::String(s)) => s.to_string(),
1170 Some(other) => other.display(),
1171 None => {
1172 return Err(VmError::Thrown(VmValue::String(Rc::from(
1173 "mcp_release: server name is required",
1174 ))));
1175 }
1176 };
1177 crate::mcp_registry::release(&name);
1178 Ok(VmValue::Nil)
1179 });
1180
1181 vm.register_builtin("mcp_registry_status", |_args, _out| {
1185 let mut out = Vec::new();
1186 for entry in crate::mcp_registry::snapshot_status() {
1187 let mut dict = BTreeMap::new();
1188 dict.insert(
1189 "name".to_string(),
1190 VmValue::String(Rc::from(entry.name.as_str())),
1191 );
1192 dict.insert("lazy".to_string(), VmValue::Bool(entry.lazy));
1193 dict.insert("active".to_string(), VmValue::Bool(entry.active));
1194 dict.insert(
1195 "ref_count".to_string(),
1196 VmValue::Int(entry.ref_count as i64),
1197 );
1198 if let Some(card) = entry.card {
1199 dict.insert("card".to_string(), VmValue::String(Rc::from(card.as_str())));
1200 }
1201 out.push(VmValue::Dict(Rc::new(dict)));
1202 }
1203 Ok(VmValue::List(Rc::new(out)))
1204 });
1205
1206 vm.register_async_builtin("mcp_server_card", |args| async move {
1213 let target = match args.first() {
1214 Some(VmValue::String(s)) => s.to_string(),
1215 Some(other) => other.display(),
1216 None => {
1217 return Err(VmError::Thrown(VmValue::String(Rc::from(
1218 "mcp_server_card: server name, URL, or path is required",
1219 ))));
1220 }
1221 };
1222
1223 let source = if target.starts_with("http://")
1229 || target.starts_with("https://")
1230 || target.contains('/')
1231 || target.contains('\\')
1232 || target.ends_with(".json")
1233 {
1234 target.clone()
1235 } else {
1236 match crate::mcp_registry::get_registration(&target) {
1237 Some(reg) => match reg.card {
1238 Some(card) => card,
1239 None => {
1240 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1241 "mcp_server_card: server '{target}' has no 'card' field in harn.toml"
1242 )))));
1243 }
1244 },
1245 None => {
1246 return Err(VmError::Thrown(VmValue::String(Rc::from(format!(
1247 "mcp_server_card: no MCP server '{target}' registered (check harn.toml) \
1248 — pass a URL or path directly instead"
1249 )))));
1250 }
1251 }
1252 };
1253
1254 let card = crate::mcp_card::fetch_server_card(&source, None)
1255 .await
1256 .map_err(|e| {
1257 VmError::Thrown(VmValue::String(Rc::from(format!("mcp_server_card: {e}"))))
1258 })?;
1259 Ok(json_to_vm_value(&card))
1260 });
1261
1262 vm.register_async_builtin("mcp_list_tools", |args| async move {
1263 let client = match args.first() {
1264 Some(VmValue::McpClient(c)) => c.clone(),
1265 _ => {
1266 return Err(VmError::Thrown(VmValue::String(Rc::from(
1267 "mcp_list_tools: argument must be an MCP client",
1268 ))));
1269 }
1270 };
1271
1272 let result = client.call("tools/list", serde_json::json!({})).await?;
1273 let mut tools = result
1274 .get("tools")
1275 .and_then(|t| t.as_array())
1276 .cloned()
1277 .unwrap_or_default();
1278
1279 let server_name = client.name.clone();
1284 for tool in tools.iter_mut() {
1285 if let Some(obj) = tool.as_object_mut() {
1286 obj.entry("_mcp_server")
1287 .or_insert_with(|| serde_json::Value::String(server_name.clone()));
1288 }
1289 }
1290
1291 let vm_tools: Vec<VmValue> = tools.iter().map(json_to_vm_value).collect();
1292 Ok(VmValue::List(Rc::new(vm_tools)))
1293 });
1294
1295 vm.register_async_builtin("mcp_call", |args| async move {
1296 let client = match args.first() {
1297 Some(VmValue::McpClient(c)) => c.clone(),
1298 _ => {
1299 return Err(VmError::Thrown(VmValue::String(Rc::from(
1300 "mcp_call: first argument must be an MCP client",
1301 ))));
1302 }
1303 };
1304
1305 let tool_name = args.get(1).map(|a| a.display()).unwrap_or_default();
1306 if tool_name.is_empty() {
1307 return Err(VmError::Thrown(VmValue::String(Rc::from(
1308 "mcp_call: tool name is required",
1309 ))));
1310 }
1311
1312 let arguments = match args.get(2) {
1313 Some(VmValue::Dict(d)) => {
1314 let obj: serde_json::Map<String, serde_json::Value> = d
1315 .iter()
1316 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1317 .collect();
1318 serde_json::Value::Object(obj)
1319 }
1320 _ => serde_json::json!({}),
1321 };
1322
1323 Ok(json_to_vm_value(
1324 &call_mcp_tool(&client, &tool_name, arguments).await?,
1325 ))
1326 });
1327
1328 vm.register_async_builtin("mcp_server_info", |args| async move {
1329 let client = match args.first() {
1330 Some(VmValue::McpClient(c)) => c.clone(),
1331 _ => {
1332 return Err(VmError::Thrown(VmValue::String(Rc::from(
1333 "mcp_server_info: argument must be an MCP client",
1334 ))));
1335 }
1336 };
1337
1338 let guard = client.inner.lock().await;
1339 if guard.is_none() {
1340 return Err(VmError::Runtime("MCP client is disconnected".into()));
1341 }
1342 drop(guard);
1343
1344 let mut info = BTreeMap::new();
1345 info.insert(
1346 "name".to_string(),
1347 VmValue::String(Rc::from(client.name.as_str())),
1348 );
1349 info.insert("connected".to_string(), VmValue::Bool(true));
1350 let initialize = client
1351 .initialize_result
1352 .lock()
1353 .await
1354 .clone()
1355 .unwrap_or(serde_json::Value::Null);
1356 if !initialize.is_null() {
1357 if let Some(instructions) = initialize
1358 .get("instructions")
1359 .or_else(|| {
1360 initialize
1361 .get("serverInfo")
1362 .and_then(|value| value.get("instructions"))
1363 })
1364 .and_then(|value| value.as_str())
1365 .filter(|value| !value.is_empty())
1366 {
1367 info.insert(
1368 "instructions".to_string(),
1369 VmValue::String(Rc::from(instructions)),
1370 );
1371 }
1372 info.insert("initialize".to_string(), json_to_vm_value(&initialize));
1373 }
1374 Ok(VmValue::Dict(Rc::new(info)))
1375 });
1376
1377 vm.register_async_builtin("mcp_disconnect", |args| async move {
1378 let client = match args.first() {
1379 Some(VmValue::McpClient(c)) => c.clone(),
1380 _ => {
1381 return Err(VmError::Thrown(VmValue::String(Rc::from(
1382 "mcp_disconnect: argument must be an MCP client",
1383 ))));
1384 }
1385 };
1386
1387 client.disconnect().await?;
1388 Ok(VmValue::Nil)
1389 });
1390
1391 vm.register_async_builtin("mcp_list_resources", |args| async move {
1392 let client = match args.first() {
1393 Some(VmValue::McpClient(c)) => c.clone(),
1394 _ => {
1395 return Err(VmError::Thrown(VmValue::String(Rc::from(
1396 "mcp_list_resources: argument must be an MCP client",
1397 ))));
1398 }
1399 };
1400
1401 let result = client.call("resources/list", serde_json::json!({})).await?;
1402 let resources = result
1403 .get("resources")
1404 .and_then(|r| r.as_array())
1405 .cloned()
1406 .unwrap_or_default();
1407
1408 let vm_resources: Vec<VmValue> = resources.iter().map(json_to_vm_value).collect();
1409 Ok(VmValue::List(Rc::new(vm_resources)))
1410 });
1411
1412 vm.register_async_builtin("mcp_read_resource", |args| async move {
1413 let client = match args.first() {
1414 Some(VmValue::McpClient(c)) => c.clone(),
1415 _ => {
1416 return Err(VmError::Thrown(VmValue::String(Rc::from(
1417 "mcp_read_resource: first argument must be an MCP client",
1418 ))));
1419 }
1420 };
1421
1422 let uri = args.get(1).map(|a| a.display()).unwrap_or_default();
1423 if uri.is_empty() {
1424 return Err(VmError::Thrown(VmValue::String(Rc::from(
1425 "mcp_read_resource: URI is required",
1426 ))));
1427 }
1428
1429 let result = client
1430 .call("resources/read", serde_json::json!({ "uri": uri }))
1431 .await?;
1432
1433 let contents = result
1434 .get("contents")
1435 .and_then(|c| c.as_array())
1436 .cloned()
1437 .unwrap_or_default();
1438
1439 if contents.len() == 1 {
1440 if let Some(text) = contents[0].get("text").and_then(|t| t.as_str()) {
1441 return Ok(VmValue::String(Rc::from(text)));
1442 }
1443 }
1444
1445 if contents.is_empty() {
1446 Ok(VmValue::Nil)
1447 } else {
1448 Ok(VmValue::List(Rc::new(
1449 contents.iter().map(json_to_vm_value).collect(),
1450 )))
1451 }
1452 });
1453
1454 vm.register_async_builtin("mcp_list_resource_templates", |args| async move {
1455 let client = match args.first() {
1456 Some(VmValue::McpClient(c)) => c.clone(),
1457 _ => {
1458 return Err(VmError::Thrown(VmValue::String(Rc::from(
1459 "mcp_list_resource_templates: argument must be an MCP client",
1460 ))));
1461 }
1462 };
1463
1464 let result = client
1465 .call("resources/templates/list", serde_json::json!({}))
1466 .await?;
1467
1468 let templates = result
1469 .get("resourceTemplates")
1470 .and_then(|r| r.as_array())
1471 .cloned()
1472 .unwrap_or_default();
1473
1474 let vm_templates: Vec<VmValue> = templates.iter().map(json_to_vm_value).collect();
1475 Ok(VmValue::List(Rc::new(vm_templates)))
1476 });
1477
1478 vm.register_async_builtin("mcp_list_prompts", |args| async move {
1479 let client = match args.first() {
1480 Some(VmValue::McpClient(c)) => c.clone(),
1481 _ => {
1482 return Err(VmError::Thrown(VmValue::String(Rc::from(
1483 "mcp_list_prompts: argument must be an MCP client",
1484 ))));
1485 }
1486 };
1487
1488 let result = client.call("prompts/list", serde_json::json!({})).await?;
1489
1490 let prompts = result
1491 .get("prompts")
1492 .and_then(|p| p.as_array())
1493 .cloned()
1494 .unwrap_or_default();
1495
1496 let vm_prompts: Vec<VmValue> = prompts.iter().map(json_to_vm_value).collect();
1497 Ok(VmValue::List(Rc::new(vm_prompts)))
1498 });
1499
1500 vm.register_async_builtin("mcp_get_prompt", |args| async move {
1501 let client = match args.first() {
1502 Some(VmValue::McpClient(c)) => c.clone(),
1503 _ => {
1504 return Err(VmError::Thrown(VmValue::String(Rc::from(
1505 "mcp_get_prompt: first argument must be an MCP client",
1506 ))));
1507 }
1508 };
1509
1510 let name = args.get(1).map(|a| a.display()).unwrap_or_default();
1511 if name.is_empty() {
1512 return Err(VmError::Thrown(VmValue::String(Rc::from(
1513 "mcp_get_prompt: prompt name is required",
1514 ))));
1515 }
1516
1517 let arguments = match args.get(2) {
1518 Some(VmValue::Dict(d)) => {
1519 let obj: serde_json::Map<String, serde_json::Value> = d
1520 .iter()
1521 .map(|(k, v)| (k.clone(), vm_value_to_serde(v)))
1522 .collect();
1523 serde_json::Value::Object(obj)
1524 }
1525 _ => serde_json::json!({}),
1526 };
1527
1528 let result = client
1529 .call(
1530 "prompts/get",
1531 serde_json::json!({
1532 "name": name,
1533 "arguments": arguments,
1534 }),
1535 )
1536 .await?;
1537
1538 Ok(json_to_vm_value(&result))
1539 });
1540}
1541
1542fn mcp_roots_builtin(_args: &[VmValue], _out: &mut String) -> Result<VmValue, VmError> {
1543 Ok(VmValue::List(Rc::new(
1544 current_mcp_roots()
1545 .iter()
1546 .map(|root| json_to_vm_value(&root.script_json()))
1547 .collect(),
1548 )))
1549}
1550
1551fn register_harn_mcp_namespace(vm: &mut Vm) {
1552 let mcp_namespace = VmValue::Dict(Rc::new(BTreeMap::from([
1553 (
1554 "_namespace".to_string(),
1555 VmValue::String(Rc::from("harn.mcp")),
1556 ),
1557 (
1558 "roots".to_string(),
1559 VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1560 ),
1561 ])));
1562 vm.set_global(
1563 "harn",
1564 VmValue::Dict(Rc::new(BTreeMap::from([
1565 ("_namespace".to_string(), VmValue::String(Rc::from("harn"))),
1566 (
1567 "mcp_roots".to_string(),
1568 VmValue::BuiltinRef(Rc::from("harn.mcp.roots")),
1569 ),
1570 ("mcp".to_string(), mcp_namespace),
1571 ]))),
1572 );
1573}
1574
1575#[cfg(test)]
1576mod tests {
1577 use super::*;
1578 use tokio::io::{AsyncReadExt, AsyncWriteExt};
1579 use tokio::net::{TcpListener, TcpStream};
1580 use tokio::sync::mpsc;
1581
1582 #[tokio::test(flavor = "current_thread")]
1583 async fn http_get_stream_dispatches_inbound_elicitation_response() {
1584 tokio::task::LocalSet::new()
1585 .run_until(async {
1586 let (base_url, mut responses) = spawn_eliciting_http_mcp_server().await;
1587 let spec = McpServerSpec {
1588 name: "mock-http".to_string(),
1589 transport: McpTransport::Http,
1590 command: String::new(),
1591 args: Vec::new(),
1592 env: BTreeMap::new(),
1593 url: format!("{base_url}/mcp"),
1594 auth_token: None,
1595 protocol_version: None,
1596 proxy_server_name: None,
1597 };
1598
1599 let handle = connect_mcp_server_from_spec(&spec).await.unwrap();
1600 let response = tokio::time::timeout(MCP_TIMEOUT, responses.recv())
1601 .await
1602 .expect("timed out waiting for elicitation response POST")
1603 .expect("mock server closed before receiving elicitation response");
1604
1605 assert_eq!(response["id"], serde_json::json!(99));
1606 assert_eq!(
1607 response["result"]["action"],
1608 serde_json::json!("decline"),
1609 "without a host bridge, inbound elicitation should decline cleanly"
1610 );
1611 handle.disconnect().await.unwrap();
1612 })
1613 .await;
1614 }
1615
1616 async fn spawn_eliciting_http_mcp_server(
1617 ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1618 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1619 let addr = listener.local_addr().unwrap();
1620 let (response_tx, response_rx) = mpsc::unbounded_channel();
1621
1622 tokio::spawn(async move {
1623 loop {
1624 let Ok((stream, _)) = listener.accept().await else {
1625 break;
1626 };
1627 let response_tx = response_tx.clone();
1628 tokio::spawn(async move {
1629 let _ = handle_mock_http_mcp_connection(stream, response_tx).await;
1630 });
1631 }
1632 });
1633
1634 (format!("http://{addr}"), response_rx)
1635 }
1636
1637 async fn spawn_recording_http_mcp_server(
1638 ) -> (String, mpsc::UnboundedReceiver<serde_json::Value>) {
1639 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
1640 let addr = listener.local_addr().unwrap();
1641 let (request_tx, request_rx) = mpsc::unbounded_channel();
1642
1643 tokio::spawn(async move {
1644 loop {
1645 let Ok((mut stream, _)) = listener.accept().await else {
1646 break;
1647 };
1648 let request_tx = request_tx.clone();
1649 tokio::spawn(async move {
1650 let Ok((_request_line, _headers, body)) = read_http_request(&mut stream).await
1651 else {
1652 return;
1653 };
1654 if let Ok(request) = serde_json::from_slice::<serde_json::Value>(&body) {
1655 let _ = request_tx.send(request);
1656 }
1657 let _ = write_http_empty(&mut stream, "202 Accepted").await;
1658 });
1659 }
1660 });
1661
1662 (format!("http://{addr}"), request_rx)
1663 }
1664
1665 async fn handle_mock_http_mcp_connection(
1666 mut stream: TcpStream,
1667 response_tx: mpsc::UnboundedSender<serde_json::Value>,
1668 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1669 let (request_line, headers, body) = read_http_request(&mut stream).await?;
1670 if request_line.starts_with("GET ") {
1671 let response = concat!(
1672 "HTTP/1.1 200 OK\r\n",
1673 "content-type: text/event-stream\r\n",
1674 "cache-control: no-cache\r\n",
1675 "\r\n",
1676 "id: prime\r\n",
1677 "data: \r\n",
1678 "\r\n",
1679 "id: elicit-1\r\n",
1680 "event: message\r\n",
1681 "data: {\"jsonrpc\":\"2.0\",\"id\":99,\"method\":\"elicitation/create\",\"params\":{\"message\":\"Need input\",\"requestedSchema\":{\"type\":\"object\",\"properties\":{}}}}\r\n",
1682 "\r\n"
1683 );
1684 stream.write_all(response.as_bytes()).await?;
1685 tokio::time::sleep(std::time::Duration::from_millis(250)).await;
1686 return Ok(());
1687 }
1688
1689 let request: serde_json::Value = serde_json::from_slice(&body)?;
1690 let method = request.get("method").and_then(|value| value.as_str());
1691 match method {
1692 Some("initialize") => {
1693 write_http_json(
1694 &mut stream,
1695 "200 OK",
1696 &[("MCP-Session-Id", "test-session")],
1697 serde_json::json!({
1698 "jsonrpc": "2.0",
1699 "id": request["id"].clone(),
1700 "result": {
1701 "protocolVersion": PROTOCOL_VERSION,
1702 "capabilities": {
1703 "elicitation": {},
1704 "tools": {}
1705 },
1706 "serverInfo": {
1707 "name": "mock",
1708 "version": "0.0.0"
1709 }
1710 }
1711 }),
1712 )
1713 .await?;
1714 }
1715 Some("notifications/initialized") => {
1716 write_http_empty(&mut stream, "202 Accepted").await?;
1717 }
1718 _ if request.get("result").is_some() || request.get("error").is_some() => {
1719 assert_eq!(
1720 headers.get("mcp-session-id").map(String::as_str),
1721 Some("test-session")
1722 );
1723 let _ = response_tx.send(request);
1724 write_http_empty(&mut stream, "202 Accepted").await?;
1725 }
1726 _ => {
1727 write_http_json(
1728 &mut stream,
1729 "200 OK",
1730 &[],
1731 serde_json::json!({
1732 "jsonrpc": "2.0",
1733 "id": request["id"].clone(),
1734 "result": {}
1735 }),
1736 )
1737 .await?;
1738 }
1739 }
1740 Ok(())
1741 }
1742
1743 async fn read_http_request(
1744 stream: &mut TcpStream,
1745 ) -> Result<(String, BTreeMap<String, String>, Vec<u8>), Box<dyn std::error::Error + Send + Sync>>
1746 {
1747 let mut buffer = Vec::new();
1748 loop {
1749 let mut chunk = [0; 1024];
1750 let bytes = stream.read(&mut chunk).await?;
1751 if bytes == 0 {
1752 break;
1753 }
1754 buffer.extend_from_slice(&chunk[..bytes]);
1755 if buffer.windows(4).any(|window| window == b"\r\n\r\n") {
1756 break;
1757 }
1758 }
1759 let header_end = buffer
1760 .windows(4)
1761 .position(|window| window == b"\r\n\r\n")
1762 .ok_or("missing HTTP header terminator")?;
1763 let header_text = String::from_utf8(buffer[..header_end].to_vec())?;
1764 let mut lines = header_text.lines();
1765 let request_line = lines.next().unwrap_or_default().to_string();
1766 let mut headers = BTreeMap::new();
1767 for line in lines {
1768 if let Some((name, value)) = line.split_once(':') {
1769 headers.insert(name.trim().to_ascii_lowercase(), value.trim().to_string());
1770 }
1771 }
1772 let content_length = headers
1773 .get("content-length")
1774 .and_then(|value| value.parse::<usize>().ok())
1775 .unwrap_or(0);
1776 let mut body = buffer[header_end + 4..].to_vec();
1777 while body.len() < content_length {
1778 let mut chunk = vec![0; content_length - body.len()];
1779 let bytes = stream.read(&mut chunk).await?;
1780 if bytes == 0 {
1781 break;
1782 }
1783 body.extend_from_slice(&chunk[..bytes]);
1784 }
1785 body.truncate(content_length);
1786 Ok((request_line, headers, body))
1787 }
1788
1789 async fn write_http_json(
1790 stream: &mut TcpStream,
1791 status: &str,
1792 headers: &[(&str, &str)],
1793 body: serde_json::Value,
1794 ) -> Result<(), std::io::Error> {
1795 let body = serde_json::to_string(&body).unwrap();
1796 let mut response = format!(
1797 "HTTP/1.1 {status}\r\ncontent-type: application/json\r\ncontent-length: {}\r\n",
1798 body.len()
1799 );
1800 for (name, value) in headers {
1801 response.push_str(name);
1802 response.push_str(": ");
1803 response.push_str(value);
1804 response.push_str("\r\n");
1805 }
1806 response.push_str("\r\n");
1807 response.push_str(&body);
1808 stream.write_all(response.as_bytes()).await
1809 }
1810
1811 async fn write_http_empty(stream: &mut TcpStream, status: &str) -> Result<(), std::io::Error> {
1812 let response = format!("HTTP/1.1 {status}\r\ncontent-length: 0\r\n\r\n");
1813 stream.write_all(response.as_bytes()).await
1814 }
1815
1816 #[test]
1817 fn test_vm_value_to_serde_string() {
1818 let val = VmValue::String(Rc::from("hello"));
1819 let json = vm_value_to_serde(&val);
1820 assert_eq!(json, serde_json::json!("hello"));
1821 }
1822
1823 #[test]
1824 fn test_vm_value_to_serde_dict() {
1825 let mut map = BTreeMap::new();
1826 map.insert("key".to_string(), VmValue::Int(42));
1827 let val = VmValue::Dict(Rc::new(map));
1828 let json = vm_value_to_serde(&val);
1829 assert_eq!(json, serde_json::json!({"key": 42}));
1830 }
1831
1832 #[test]
1833 fn test_vm_value_to_serde_list() {
1834 let val = VmValue::List(Rc::new(vec![VmValue::Int(1), VmValue::Int(2)]));
1835 let json = vm_value_to_serde(&val);
1836 assert_eq!(json, serde_json::json!([1, 2]));
1837 }
1838
1839 #[test]
1840 fn test_extract_content_text_single() {
1841 let result = serde_json::json!({
1842 "content": [{"type": "text", "text": "hello world"}],
1843 "isError": false
1844 });
1845 assert_eq!(extract_content_text(&result), "hello world");
1846 }
1847
1848 #[test]
1849 fn test_extract_content_text_multiple() {
1850 let result = serde_json::json!({
1851 "content": [
1852 {"type": "text", "text": "first"},
1853 {"type": "text", "text": "second"}
1854 ],
1855 "isError": false
1856 });
1857 assert_eq!(extract_content_text(&result), "first\nsecond");
1858 }
1859
1860 #[test]
1861 fn test_extract_content_text_fallback_json() {
1862 let result = serde_json::json!({
1863 "content": [{"type": "image", "data": "abc"}],
1864 "isError": false
1865 });
1866 let output = extract_content_text(&result);
1867 assert!(output.contains("image"));
1868 }
1869
1870 #[tokio::test(flavor = "current_thread")]
1871 async fn test_parse_sse_jsonrpc_body_uses_matching_jsonrpc_response() {
1872 let inner = HttpMcpClientInner {
1873 client: reqwest::Client::new(),
1874 url: "http://127.0.0.1/mcp".to_string(),
1875 auth_token: None,
1876 protocol_version: PROTOCOL_VERSION.to_string(),
1877 session_id: None,
1878 next_id: 1,
1879 proxy_server_name: None,
1880 get_stream_task: None,
1881 };
1882 let body = "event: message\ndata: {\"jsonrpc\":\"2.0\",\"method\":\"notifications/message\"}\n\nevent: message\ndata: {\"jsonrpc\":\"2.0\",\"id\":1,\"result\":{\"tools\":[]}}\n\n";
1883 let parsed = parse_sse_jsonrpc_body(&inner, "mock", body, Some(1))
1884 .await
1885 .unwrap();
1886 assert_eq!(parsed["result"]["tools"], serde_json::json!([]));
1887 }
1888
1889 #[test]
1890 fn client_rejects_unadvertised_server_to_client_requests() {
1891 let unknown = client_request_rejection(&serde_json::json!({
1892 "jsonrpc": "2.0",
1893 "id": "custom-1",
1894 "method": "custom/method",
1895 "params": {}
1896 }))
1897 .expect("rejection");
1898 assert_eq!(unknown["error"]["code"], serde_json::json!(-32601));
1899 assert!(unknown["error"].get("data").is_none());
1900 }
1901
1902 #[test]
1903 fn current_mcp_roots_prefers_project_root_over_child_cwd() {
1904 let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1905 let child = root.join("nested");
1906 std::fs::create_dir_all(&child).unwrap();
1907 std::fs::write(root.join("harn.toml"), "[package]\nname = \"roots\"\n").unwrap();
1908
1909 crate::stdlib::process::set_thread_execution_context(Some(
1910 crate::orchestration::RunExecutionRecord {
1911 cwd: Some(child.to_string_lossy().into_owned()),
1912 source_dir: Some(child.to_string_lossy().into_owned()),
1913 ..Default::default()
1914 },
1915 ));
1916
1917 let roots = current_mcp_roots();
1918 let expected_root = std::fs::canonicalize(&root).unwrap();
1919 assert_eq!(roots.len(), 1);
1920 assert_eq!(roots[0].path, expected_root.to_string_lossy());
1921 assert!(roots[0].uri.starts_with("file://"));
1922 assert_eq!(
1923 roots[0].name,
1924 expected_root.file_name().unwrap().to_string_lossy()
1925 );
1926
1927 crate::stdlib::process::reset_process_state();
1928 let _ = std::fs::remove_dir_all(&root);
1929 }
1930
1931 #[tokio::test(flavor = "current_thread")]
1932 async fn handle_inbound_routes_roots_list() {
1933 let root = std::env::temp_dir().join(format!("harn-mcp-roots-{}", uuid::Uuid::now_v7()));
1934 std::fs::create_dir_all(&root).unwrap();
1935 crate::stdlib::process::set_thread_execution_context(Some(
1936 crate::orchestration::RunExecutionRecord {
1937 cwd: Some(root.to_string_lossy().into_owned()),
1938 ..Default::default()
1939 },
1940 ));
1941
1942 let request = serde_json::json!({
1943 "jsonrpc": "2.0",
1944 "id": "roots-1",
1945 "method": crate::mcp_protocol::METHOD_ROOTS_LIST,
1946 });
1947 let response = handle_inbound_client_request("mock", &request)
1948 .await
1949 .expect("roots/list should produce a response");
1950 let expected_root = std::fs::canonicalize(&root).unwrap();
1951 assert_eq!(response["id"], serde_json::json!("roots-1"));
1952 assert_eq!(response["result"]["roots"].as_array().unwrap().len(), 1);
1953 assert_eq!(
1954 response["result"]["roots"][0]["uri"],
1955 serde_json::json!(url::Url::from_file_path(&expected_root)
1956 .unwrap()
1957 .to_string())
1958 );
1959
1960 crate::stdlib::process::reset_process_state();
1961 let _ = std::fs::remove_dir_all(&root);
1962 }
1963
1964 #[tokio::test(flavor = "current_thread")]
1965 async fn roots_list_changed_notification_is_sent_once_per_snapshot() {
1966 tokio::task::LocalSet::new()
1967 .run_until(async {
1968 let (base_url, mut requests) = spawn_recording_http_mcp_server().await;
1969 let handle = VmMcpClientHandle {
1970 name: "mock-http".to_string(),
1971 inner: Arc::new(Mutex::new(Some(McpClientInner::Http(HttpMcpClientInner {
1972 client: reqwest::Client::new(),
1973 url: format!("{base_url}/mcp"),
1974 auth_token: None,
1975 protocol_version: PROTOCOL_VERSION.to_string(),
1976 session_id: None,
1977 next_id: 1,
1978 proxy_server_name: None,
1979 get_stream_task: None,
1980 })))),
1981 last_roots: Arc::new(Mutex::new(Vec::new())),
1982 initialize_result: Arc::new(Mutex::new(None)),
1983 };
1984
1985 handle.notify_roots_list_changed_if_needed().await.unwrap();
1986 let notification = tokio::time::timeout(MCP_TIMEOUT, requests.recv())
1987 .await
1988 .expect("timed out waiting for roots notification")
1989 .expect("mock server closed before notification");
1990 assert_eq!(
1991 notification["method"],
1992 serde_json::json!(crate::mcp_protocol::METHOD_ROOTS_LIST_CHANGED_NOTIFICATION)
1993 );
1994
1995 handle.notify_roots_list_changed_if_needed().await.unwrap();
1996 assert!(
1997 tokio::time::timeout(std::time::Duration::from_millis(50), requests.recv())
1998 .await
1999 .is_err(),
2000 "unchanged roots should not send another notification"
2001 );
2002 })
2003 .await;
2004 }
2005
2006 #[tokio::test(flavor = "current_thread")]
2007 async fn handle_inbound_routes_sampling_to_dispatcher() {
2008 let request = serde_json::json!({
2015 "jsonrpc": "2.0",
2016 "id": 42,
2017 "method": crate::mcp_sampling::SAMPLING_METHOD,
2018 "params": {
2019 "messages": [
2020 {"role": "user", "content": {"type": "text", "text": "ping"}}
2021 ],
2022 "maxTokens": 4,
2023 },
2024 });
2025 let response = handle_inbound_client_request("mock", &request)
2026 .await
2027 .expect("sampling should produce a response");
2028 assert_eq!(response["id"], serde_json::json!(42));
2029 assert_eq!(
2030 response["error"]["data"]["type"],
2031 serde_json::json!("mcp.samplingDeclined")
2032 );
2033 }
2034}