1use std::cell::RefCell;
2
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::mpsc;
5
6use crate::mcp_elicit::{install_bus, ElicitationBus};
7use crate::mcp_progress::{
8 active_bus as active_progress_bus, install_active_bus as install_active_progress_bus,
9 is_valid_progress_token, scope_context, ProgressBus, ProgressContext,
10};
11use crate::stdlib::json_to_vm_value;
12use crate::value::VmError;
13use crate::vm::Vm;
14
15use super::convert::{prompt_value_to_messages, vm_value_to_content, vm_value_to_json};
16use super::defs::{
17 McpCompletionSource, McpPromptDef, McpResourceDef, McpResourceTemplateDef, McpToolDef,
18};
19use super::uri::{match_uri_template, uri_template_variables};
20use super::PROTOCOL_VERSION;
21
22pub struct McpServer {
24 server_name: String,
25 server_version: String,
26 tools: Vec<McpToolDef>,
27 resources: Vec<McpResourceDef>,
28 resource_templates: Vec<McpResourceTemplateDef>,
29 prompts: Vec<McpPromptDef>,
30 log_level: RefCell<String>,
31 server_card: Option<serde_json::Value>,
36}
37
38impl McpServer {
39 pub fn new(
40 server_name: String,
41 tools: Vec<McpToolDef>,
42 resources: Vec<McpResourceDef>,
43 resource_templates: Vec<McpResourceTemplateDef>,
44 prompts: Vec<McpPromptDef>,
45 ) -> Self {
46 Self {
47 server_name,
48 server_version: env!("CARGO_PKG_VERSION").to_string(),
49 tools,
50 resources,
51 resource_templates,
52 prompts,
53 log_level: RefCell::new("warning".to_string()),
54 server_card: None,
55 }
56 }
57
58 pub fn with_server_card(mut self, card: serde_json::Value) -> Self {
62 self.server_card = Some(card);
63 self
64 }
65
66 pub async fn run(&self, vm: &mut Vm) -> Result<(), VmError> {
76 let (out_tx, mut out_rx) = mpsc::unbounded_channel::<serde_json::Value>();
77 let (in_tx, mut in_rx) = mpsc::unbounded_channel::<serde_json::Value>();
78 let bus = ElicitationBus::new(out_tx.clone());
79 let progress_bus = ProgressBus::from_mpsc(out_tx.clone());
80
81 let bus_for_reader = bus.clone();
82 let in_tx_reader = in_tx.clone();
83 let reader = tokio::spawn(async move {
84 let stdin = BufReader::new(tokio::io::stdin());
85 let mut lines = stdin.lines();
86 while let Ok(Some(line)) = lines.next_line().await {
87 let trimmed = line.trim();
88 if trimmed.is_empty() {
89 continue;
90 }
91 let msg: serde_json::Value = match serde_json::from_str(trimmed) {
92 Ok(v) => v,
93 Err(_) => continue,
94 };
95 if msg.get("method").is_none() {
101 let _ = bus_for_reader.route_response(&msg);
102 continue;
103 }
104 if in_tx_reader.send(msg).is_err() {
105 break;
106 }
107 }
108 });
109 drop(in_tx);
112
113 let writer = tokio::spawn(async move {
114 let mut stdout = tokio::io::stdout();
115 while let Some(msg) = out_rx.recv().await {
116 let mut line = match serde_json::to_string(&msg) {
117 Ok(value) => value,
118 Err(_) => continue,
119 };
120 line.push('\n');
121 if stdout.write_all(line.as_bytes()).await.is_err() {
122 break;
123 }
124 if stdout.flush().await.is_err() {
125 break;
126 }
127 }
128 });
129
130 let previous_bus = install_bus(Some(bus));
132 let previous_progress = install_active_progress_bus(Some(progress_bus));
133
134 while let Some(msg) = in_rx.recv().await {
135 if let Some(response) = self.handle_json_rpc(msg, vm).await {
136 if out_tx.send(response).is_err() {
137 break;
138 }
139 }
140 }
141
142 drop(out_tx);
146 install_bus(previous_bus);
147 install_active_progress_bus(previous_progress);
148
149 let _ = writer.await;
153 reader.abort();
154 Ok(())
155 }
156
157 pub async fn handle_json_rpc(
159 &self,
160 msg: serde_json::Value,
161 vm: &mut Vm,
162 ) -> Option<serde_json::Value> {
163 let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
164 let id = msg.get("id").cloned()?;
165 let params = msg.get("params").cloned().unwrap_or(serde_json::json!({}));
166
167 if let Some(response) =
168 crate::mcp_protocol::unsupported_client_bound_method_response(id.clone(), method)
169 {
170 return Some(response);
171 }
172
173 Some(match method {
174 "initialize" => self.handle_initialize(&id),
175 "ping" => crate::jsonrpc::response(id.clone(), serde_json::json!({})),
176 "logging/setLevel" => self.handle_logging_set_level(&id, ¶ms),
177 "harn.hitl.respond" => self.handle_hitl_respond(&id, ¶ms).await,
178 "tools/list" => self.handle_tools_list(&id, ¶ms),
179 "tools/call" => self.handle_tools_call(&id, ¶ms, vm).await,
180 crate::mcp_protocol::METHOD_TASKS_GET => self.handle_task_lookup(&id, ¶ms),
181 crate::mcp_protocol::METHOD_TASKS_RESULT => self.handle_task_lookup(&id, ¶ms),
182 crate::mcp_protocol::METHOD_TASKS_LIST => self.handle_tasks_list(&id, ¶ms),
183 crate::mcp_protocol::METHOD_TASKS_CANCEL => self.handle_task_lookup(&id, ¶ms),
184 "resources/list" => self.handle_resources_list(&id, ¶ms),
185 "resources/read" => self.handle_resources_read(&id, ¶ms, vm).await,
186 "resources/subscribe" => self.handle_resources_subscribe(&id, ¶ms),
187 "resources/unsubscribe" => self.handle_resources_unsubscribe(&id, ¶ms),
188 "resources/templates/list" => self.handle_resource_templates_list(&id, ¶ms),
189 "prompts/list" => self.handle_prompts_list(&id, ¶ms),
190 "prompts/get" => self.handle_prompts_get(&id, ¶ms, vm).await,
191 crate::mcp_protocol::METHOD_COMPLETION_COMPLETE => {
192 self.handle_completion_complete(&id, ¶ms, vm).await
193 }
194 _ if crate::mcp_protocol::unsupported_latest_spec_method(method).is_some() => {
195 crate::mcp_protocol::unsupported_latest_spec_method_response(id.clone(), method)
196 .expect("checked unsupported MCP method")
197 }
198 _ => serde_json::json!({
199 "jsonrpc": "2.0",
200 "id": id,
201 "error": {
202 "code": -32601,
203 "message": format!("Method not found: {method}")
204 }
205 }),
206 })
207 }
208
209 fn handle_initialize(&self, id: &serde_json::Value) -> serde_json::Value {
210 let mut capabilities = serde_json::Map::new();
211 if !self.tools.is_empty() {
212 capabilities.insert("tools".into(), serde_json::json!({ "listChanged": true }));
213 }
214 if !self.resources.is_empty()
215 || !self.resource_templates.is_empty()
216 || self.server_card.is_some()
217 {
218 capabilities.insert(
219 "resources".into(),
220 serde_json::json!({ "listChanged": true, "subscribe": true }),
221 );
222 }
223 if !self.prompts.is_empty() {
224 capabilities.insert("prompts".into(), serde_json::json!({ "listChanged": true }));
225 }
226 capabilities.insert("logging".into(), serde_json::json!({}));
227 capabilities.insert("tasks".into(), crate::mcp_protocol::tasks_capability());
228 capabilities.insert(
229 "completions".into(),
230 crate::mcp_protocol::completions_capability(),
231 );
232 capabilities.insert("elicitation".into(), serde_json::json!({}));
236
237 let mut server_info = serde_json::json!({
238 "name": self.server_name,
239 "version": self.server_version
240 });
241 if let Some(ref card) = self.server_card {
242 server_info["card"] = card.clone();
243 }
244
245 serde_json::json!({
246 "jsonrpc": "2.0",
247 "id": id,
248 "result": {
249 "protocolVersion": PROTOCOL_VERSION,
250 "capabilities": capabilities,
251 "serverInfo": server_info
252 }
253 })
254 }
255
256 fn handle_tools_list(
257 &self,
258 id: &serde_json::Value,
259 params: &serde_json::Value,
260 ) -> serde_json::Value {
261 let page = match crate::mcp_protocol::mcp_list_page(params, self.tools.len(), "tools/list")
262 {
263 Ok(page) => page,
264 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
265 };
266 let tools: Vec<serde_json::Value> = self.tools[page.start..page.end]
267 .iter()
268 .map(|t| {
269 let mut entry = serde_json::json!({
270 "name": t.name,
271 "description": t.description,
272 "inputSchema": t.input_schema,
273 });
274 if let Some(ref title) = t.title {
275 entry["title"] = serde_json::json!(title);
276 }
277 if let Some(ref output_schema) = t.output_schema {
278 entry["outputSchema"] = output_schema.clone();
279 }
280 if let Some(ref annotations) = t.annotations {
281 entry["annotations"] = annotations.clone();
282 }
283 if let Some(ref icons) = t.icons {
284 entry["icons"] = icons.clone();
285 }
286 entry["execution"] = crate::mcp_protocol::tool_execution(
287 crate::mcp_protocol::McpToolTaskSupport::Forbidden,
288 );
289 entry
290 })
291 .collect();
292
293 let mut result = serde_json::json!({ "tools": tools });
294 if let Some(next_cursor) = page.next_cursor {
295 result["nextCursor"] = serde_json::json!(next_cursor);
296 }
297
298 serde_json::json!({
299 "jsonrpc": "2.0",
300 "id": id,
301 "result": result
302 })
303 }
304
305 async fn handle_tools_call(
306 &self,
307 id: &serde_json::Value,
308 params: &serde_json::Value,
309 vm: &mut Vm,
310 ) -> serde_json::Value {
311 let tool_name = params.get("name").and_then(|n| n.as_str()).unwrap_or("");
312 if crate::mcp_protocol::requests_task_augmentation(params) {
313 return crate::mcp_protocol::task_augmentation_error_response(
314 id.clone(),
315 "tools/call",
316 -32602,
317 "Tool does not support MCP task-augmented execution",
318 "This Harn MCP server executes registered Harn closures inline, so every tool advertises execution.taskSupport=\"forbidden\".",
319 );
320 }
321
322 let tool = match self.tools.iter().find(|t| t.name == tool_name) {
323 Some(t) => t,
324 None => {
325 return serde_json::json!({
326 "jsonrpc": "2.0",
327 "id": id,
328 "error": { "code": -32602, "message": format!("Unknown tool: {tool_name}") }
329 });
330 }
331 };
332
333 let arguments = params
334 .get("arguments")
335 .cloned()
336 .unwrap_or(serde_json::json!({}));
337 let args_vm = json_to_vm_value(&arguments);
338
339 let progress_token = params
348 .pointer("/_meta/progressToken")
349 .cloned()
350 .filter(is_valid_progress_token);
351 let progress_ctx = progress_token
352 .and_then(|token| active_progress_bus().map(|bus| ProgressContext::new(bus, token)));
353
354 let result =
355 scope_context(progress_ctx, vm.call_closure_pub(&tool.handler, &[args_vm])).await;
356
357 match result {
358 Ok(value) => {
359 let content = vm_value_to_content(&value);
360 let mut call_result = serde_json::json!({
361 "content": content,
362 "isError": false
363 });
364 if tool.output_schema.is_some() {
365 let text = value.display();
366 let structured = match serde_json::from_str::<serde_json::Value>(&text) {
367 Ok(v) => v,
368 _ => serde_json::json!(text),
369 };
370 call_result["structuredContent"] = structured;
371 }
372 serde_json::json!({
373 "jsonrpc": "2.0",
374 "id": id,
375 "result": call_result
376 })
377 }
378 Err(e) => serde_json::json!({
379 "jsonrpc": "2.0",
380 "id": id,
381 "result": {
382 "content": [{ "type": "text", "text": format!("{e}") }],
383 "isError": true
384 }
385 }),
386 }
387 }
388
389 fn handle_task_lookup(
390 &self,
391 id: &serde_json::Value,
392 params: &serde_json::Value,
393 ) -> serde_json::Value {
394 let task_id = params
395 .get("taskId")
396 .and_then(|value| value.as_str())
397 .unwrap_or_default();
398 serde_json::json!({
399 "jsonrpc": "2.0",
400 "id": id,
401 "error": {
402 "code": -32602,
403 "message": format!("Failed to retrieve task: task not found '{task_id}'")
404 }
405 })
406 }
407
408 fn handle_tasks_list(
409 &self,
410 id: &serde_json::Value,
411 _params: &serde_json::Value,
412 ) -> serde_json::Value {
413 serde_json::json!({
414 "jsonrpc": "2.0",
415 "id": id,
416 "result": { "tasks": [] }
417 })
418 }
419
420 async fn handle_hitl_respond(
421 &self,
422 id: &serde_json::Value,
423 params: &serde_json::Value,
424 ) -> serde_json::Value {
425 let response: crate::stdlib::hitl::HitlHostResponse =
426 match serde_json::from_value(params.clone()) {
427 Ok(response) => response,
428 Err(error) => {
429 return serde_json::json!({
430 "jsonrpc": "2.0",
431 "id": id,
432 "error": {
433 "code": -32602,
434 "message": format!("invalid harn.hitl.respond params: {error}"),
435 }
436 });
437 }
438 };
439 let cwd = std::env::current_dir().ok();
440 match crate::stdlib::hitl::append_hitl_response(cwd.as_deref(), response).await {
441 Ok(_) => serde_json::json!({
442 "jsonrpc": "2.0",
443 "id": id,
444 "result": { "ok": true }
445 }),
446 Err(error) => serde_json::json!({
447 "jsonrpc": "2.0",
448 "id": id,
449 "error": {
450 "code": -32000,
451 "message": error
452 }
453 }),
454 }
455 }
456
457 fn handle_resources_list(
458 &self,
459 id: &serde_json::Value,
460 params: &serde_json::Value,
461 ) -> serde_json::Value {
462 let mut all_resources = Vec::with_capacity(self.resources.len() + 1);
463 if self.server_card.is_some() {
464 all_resources.push(serde_json::json!({
465 "uri": "well-known://mcp-card",
466 "name": "Server Card",
467 "description": "MCP v2.1 Server Card advertising this server's identity and capabilities",
468 "mimeType": "application/json",
469 }));
470 }
471 all_resources.extend(self.resources.iter().map(|r| {
472 let mut entry = serde_json::json!({ "uri": r.uri, "name": r.name });
473 if let Some(ref title) = r.title {
474 entry["title"] = serde_json::json!(title);
475 }
476 if let Some(ref desc) = r.description {
477 entry["description"] = serde_json::json!(desc);
478 }
479 if let Some(ref mime) = r.mime_type {
480 entry["mimeType"] = serde_json::json!(mime);
481 }
482 entry
483 }));
484
485 let page =
486 match crate::mcp_protocol::mcp_list_page(params, all_resources.len(), "resources/list")
487 {
488 Ok(page) => page,
489 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
490 };
491 let resources = all_resources[page.start..page.end].to_vec();
492
493 let mut result = serde_json::json!({ "resources": resources });
494 if let Some(next_cursor) = page.next_cursor {
495 result["nextCursor"] = serde_json::json!(next_cursor);
496 }
497
498 serde_json::json!({
499 "jsonrpc": "2.0",
500 "id": id,
501 "result": result
502 })
503 }
504
505 async fn handle_resources_read(
506 &self,
507 id: &serde_json::Value,
508 params: &serde_json::Value,
509 vm: &mut Vm,
510 ) -> serde_json::Value {
511 let uri = params.get("uri").and_then(|u| u.as_str()).unwrap_or("");
512
513 if uri == "well-known://mcp-card" {
517 if let Some(ref card) = self.server_card {
518 let content = serde_json::json!({
519 "uri": uri,
520 "text": serde_json::to_string(card).unwrap_or_else(|_| "{}".to_string()),
521 "mimeType": "application/json",
522 });
523 return serde_json::json!({
524 "jsonrpc": "2.0",
525 "id": id,
526 "result": { "contents": [content] }
527 });
528 }
529 }
530
531 if let Some(resource) = self.resources.iter().find(|r| r.uri == uri) {
533 let mut content = serde_json::json!({ "uri": resource.uri, "text": resource.text });
534 if let Some(ref mime) = resource.mime_type {
535 content["mimeType"] = serde_json::json!(mime);
536 }
537 return serde_json::json!({
538 "jsonrpc": "2.0",
539 "id": id,
540 "result": { "contents": [content] }
541 });
542 }
543
544 for tmpl in &self.resource_templates {
545 if let Some(args) = match_uri_template(&tmpl.uri_template, uri) {
546 let args_vm = json_to_vm_value(&serde_json::json!(args));
547 let result = vm.call_closure_pub(&tmpl.handler, &[args_vm]).await;
548 return match result {
549 Ok(value) => {
550 let mut content = serde_json::json!({
551 "uri": uri,
552 "text": value.display(),
553 });
554 if let Some(ref mime) = tmpl.mime_type {
555 content["mimeType"] = serde_json::json!(mime);
556 }
557 serde_json::json!({
558 "jsonrpc": "2.0",
559 "id": id,
560 "result": { "contents": [content] }
561 })
562 }
563 Err(e) => serde_json::json!({
564 "jsonrpc": "2.0",
565 "id": id,
566 "error": { "code": -32603, "message": format!("{e}") }
567 }),
568 };
569 }
570 }
571
572 serde_json::json!({
573 "jsonrpc": "2.0",
574 "id": id,
575 "error": { "code": -32002, "message": format!("Resource not found: {uri}") }
576 })
577 }
578
579 fn handle_resources_subscribe(
580 &self,
581 id: &serde_json::Value,
582 params: &serde_json::Value,
583 ) -> serde_json::Value {
584 let uri = params.get("uri").and_then(|u| u.as_str()).unwrap_or("");
585 if !self.resource_uri_exists(uri) {
586 return serde_json::json!({
587 "jsonrpc": "2.0",
588 "id": id,
589 "error": { "code": -32002, "message": format!("Resource not found: {uri}") }
590 });
591 }
592 crate::jsonrpc::response(id.clone(), serde_json::json!({}))
593 }
594
595 fn handle_resources_unsubscribe(
596 &self,
597 id: &serde_json::Value,
598 _params: &serde_json::Value,
599 ) -> serde_json::Value {
600 crate::jsonrpc::response(id.clone(), serde_json::json!({}))
601 }
602
603 fn resource_uri_exists(&self, uri: &str) -> bool {
604 if uri == "well-known://mcp-card" {
605 return self.server_card.is_some();
606 }
607 self.resources.iter().any(|resource| resource.uri == uri)
608 || self
609 .resource_templates
610 .iter()
611 .any(|template| match_uri_template(&template.uri_template, uri).is_some())
612 }
613
614 fn handle_resource_templates_list(
615 &self,
616 id: &serde_json::Value,
617 params: &serde_json::Value,
618 ) -> serde_json::Value {
619 let page = match crate::mcp_protocol::mcp_list_page(
620 params,
621 self.resource_templates.len(),
622 "resources/templates/list",
623 ) {
624 Ok(page) => page,
625 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
626 };
627 let templates: Vec<serde_json::Value> = self.resource_templates[page.start..page.end]
628 .iter()
629 .map(|t| {
630 let mut entry =
631 serde_json::json!({ "uriTemplate": t.uri_template, "name": t.name });
632 if let Some(ref title) = t.title {
633 entry["title"] = serde_json::json!(title);
634 }
635 if let Some(ref desc) = t.description {
636 entry["description"] = serde_json::json!(desc);
637 }
638 if let Some(ref mime) = t.mime_type {
639 entry["mimeType"] = serde_json::json!(mime);
640 }
641 entry
642 })
643 .collect();
644
645 let mut result = serde_json::json!({ "resourceTemplates": templates });
646 if let Some(next_cursor) = page.next_cursor {
647 result["nextCursor"] = serde_json::json!(next_cursor);
648 }
649
650 serde_json::json!({
651 "jsonrpc": "2.0",
652 "id": id,
653 "result": result
654 })
655 }
656
657 fn handle_prompts_list(
658 &self,
659 id: &serde_json::Value,
660 params: &serde_json::Value,
661 ) -> serde_json::Value {
662 let page =
663 match crate::mcp_protocol::mcp_list_page(params, self.prompts.len(), "prompts/list") {
664 Ok(page) => page,
665 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
666 };
667 let prompts: Vec<serde_json::Value> = self.prompts[page.start..page.end]
668 .iter()
669 .map(|p| {
670 let mut entry = serde_json::json!({ "name": p.name });
671 if let Some(ref title) = p.title {
672 entry["title"] = serde_json::json!(title);
673 }
674 if let Some(ref desc) = p.description {
675 entry["description"] = serde_json::json!(desc);
676 }
677 if let Some(ref args) = p.arguments {
678 let args_json: Vec<serde_json::Value> = args
679 .iter()
680 .map(|a| {
681 let mut arg =
682 serde_json::json!({ "name": a.name, "required": a.required });
683 if let Some(ref desc) = a.description {
684 arg["description"] = serde_json::json!(desc);
685 }
686 arg
687 })
688 .collect();
689 entry["arguments"] = serde_json::json!(args_json);
690 }
691 entry
692 })
693 .collect();
694
695 let mut result = serde_json::json!({ "prompts": prompts });
696 if let Some(next_cursor) = page.next_cursor {
697 result["nextCursor"] = serde_json::json!(next_cursor);
698 }
699
700 serde_json::json!({
701 "jsonrpc": "2.0",
702 "id": id,
703 "result": result
704 })
705 }
706
707 fn handle_logging_set_level(
708 &self,
709 id: &serde_json::Value,
710 params: &serde_json::Value,
711 ) -> serde_json::Value {
712 let level = params
713 .get("level")
714 .and_then(|l| l.as_str())
715 .unwrap_or("warning");
716 *self.log_level.borrow_mut() = level.to_string();
717 crate::jsonrpc::response(id.clone(), serde_json::json!({}))
718 }
719
720 async fn handle_prompts_get(
721 &self,
722 id: &serde_json::Value,
723 params: &serde_json::Value,
724 vm: &mut Vm,
725 ) -> serde_json::Value {
726 let name = params.get("name").and_then(|n| n.as_str()).unwrap_or("");
727
728 let prompt = match self.prompts.iter().find(|p| p.name == name) {
729 Some(p) => p,
730 None => {
731 return serde_json::json!({
732 "jsonrpc": "2.0",
733 "id": id,
734 "error": { "code": -32602, "message": format!("Unknown prompt: {name}") }
735 });
736 }
737 };
738
739 let arguments = params
740 .get("arguments")
741 .cloned()
742 .unwrap_or(serde_json::json!({}));
743 let args_vm = json_to_vm_value(&arguments);
744
745 let result = vm.call_closure_pub(&prompt.handler, &[args_vm]).await;
746
747 match result {
748 Ok(value) => {
749 let messages = prompt_value_to_messages(&value);
750 serde_json::json!({
751 "jsonrpc": "2.0",
752 "id": id,
753 "result": { "messages": messages }
754 })
755 }
756 Err(e) => serde_json::json!({
757 "jsonrpc": "2.0",
758 "id": id,
759 "error": { "code": -32603, "message": format!("{e}") }
760 }),
761 }
762 }
763
764 async fn handle_completion_complete(
765 &self,
766 id: &serde_json::Value,
767 params: &serde_json::Value,
768 vm: &mut Vm,
769 ) -> serde_json::Value {
770 let Some(ref_type) = params.pointer("/ref/type").and_then(|value| value.as_str()) else {
771 return crate::jsonrpc::error_response(
772 id.clone(),
773 -32602,
774 "completion ref.type is required",
775 );
776 };
777 match ref_type {
778 "ref/prompt" => self.complete_prompt_argument(id, params, vm).await,
779 "ref/resource" => {
780 self.complete_resource_template_argument(id, params, vm)
781 .await
782 }
783 other => crate::jsonrpc::error_response(
784 id.clone(),
785 -32602,
786 &format!("Unsupported completion ref.type: {other}"),
787 ),
788 }
789 }
790
791 async fn complete_prompt_argument(
792 &self,
793 id: &serde_json::Value,
794 params: &serde_json::Value,
795 vm: &mut Vm,
796 ) -> serde_json::Value {
797 let name = params
798 .pointer("/ref/name")
799 .and_then(|value| value.as_str())
800 .unwrap_or("");
801 let argument_name = match completion_argument_name(params) {
802 Ok(name) => name,
803 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
804 };
805 let value = completion_argument_value(params);
806 let prompt = match self.prompts.iter().find(|prompt| prompt.name == name) {
807 Some(prompt) => prompt,
808 None => {
809 return crate::jsonrpc::error_response(
810 id.clone(),
811 -32602,
812 &format!("Unknown prompt: {name}"),
813 );
814 }
815 };
816 let Some(argument) = prompt
817 .arguments
818 .as_deref()
819 .unwrap_or_default()
820 .iter()
821 .find(|argument| argument.name == argument_name)
822 else {
823 return crate::jsonrpc::error_response(
824 id.clone(),
825 -32602,
826 &format!("Unknown prompt argument: {argument_name}"),
827 );
828 };
829 let candidates =
830 match completion_source_candidates(argument.completion.as_ref(), params, vm).await {
831 Ok(candidates) => candidates,
832 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32603, &error),
833 };
834 crate::mcp_protocol::completion_result(id.clone(), candidates, value)
835 }
836
837 async fn complete_resource_template_argument(
838 &self,
839 id: &serde_json::Value,
840 params: &serde_json::Value,
841 vm: &mut Vm,
842 ) -> serde_json::Value {
843 let uri = params
844 .pointer("/ref/uri")
845 .and_then(|value| value.as_str())
846 .unwrap_or("");
847 let argument_name = match completion_argument_name(params) {
848 Ok(name) => name,
849 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
850 };
851 let value = completion_argument_value(params);
852 let template = match self
853 .resource_templates
854 .iter()
855 .find(|template| template.uri_template == uri)
856 {
857 Some(template) => template,
858 None => {
859 return crate::jsonrpc::error_response(
860 id.clone(),
861 -32602,
862 &format!("Unknown resource template: {uri}"),
863 );
864 }
865 };
866 if !uri_template_variables(&template.uri_template)
867 .iter()
868 .any(|name| name == argument_name)
869 {
870 return crate::jsonrpc::error_response(
871 id.clone(),
872 -32602,
873 &format!("Unknown resource template argument: {argument_name}"),
874 );
875 }
876 let candidates =
877 match completion_source_candidates(template.completions.get(argument_name), params, vm)
878 .await
879 {
880 Ok(candidates) => candidates,
881 Err(error) => return crate::jsonrpc::error_response(id.clone(), -32603, &error),
882 };
883 crate::mcp_protocol::completion_result(id.clone(), candidates, value)
884 }
885}
886
887fn completion_argument_name(params: &serde_json::Value) -> Result<&str, String> {
888 params
889 .pointer("/argument/name")
890 .and_then(|value| value.as_str())
891 .filter(|value| !value.is_empty())
892 .ok_or_else(|| "completion argument.name is required".to_string())
893}
894
895fn completion_argument_value(params: &serde_json::Value) -> &str {
896 params
897 .pointer("/argument/value")
898 .and_then(|value| value.as_str())
899 .unwrap_or_default()
900}
901
902async fn completion_source_candidates(
903 source: Option<&McpCompletionSource>,
904 params: &serde_json::Value,
905 vm: &mut Vm,
906) -> Result<Vec<String>, String> {
907 let Some(source) = source else {
908 return Ok(Vec::new());
909 };
910 let mut candidates = source.values.clone();
911 if let Some(handler) = source.handler.as_ref() {
912 let request = json_to_vm_value(params);
913 let value = vm
914 .call_closure_pub(handler, &[request])
915 .await
916 .map_err(|error| format!("{error}"))?;
917 candidates.extend(completion_candidates_from_json(&vm_value_to_json(&value)));
918 }
919 Ok(candidates)
920}
921
922fn completion_candidates_from_json(value: &serde_json::Value) -> Vec<String> {
923 match value {
924 serde_json::Value::Array(items) => {
925 items.iter().filter_map(json_completion_string).collect()
926 }
927 serde_json::Value::Object(map) => map
928 .get("values")
929 .or_else(|| map.get("completion").and_then(|value| value.get("values")))
930 .map(completion_candidates_from_json)
931 .unwrap_or_default(),
932 _ => json_completion_string(value).into_iter().collect(),
933 }
934}
935
936fn json_completion_string(value: &serde_json::Value) -> Option<String> {
937 match value {
938 serde_json::Value::String(value) => Some(value.clone()),
939 serde_json::Value::Number(_) | serde_json::Value::Bool(_) => Some(value.to_string()),
940 _ => None,
941 }
942}