Skip to main content

harn_vm/mcp_server/
server.rs

1use std::cell::RefCell;
2
3use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
4use tokio::sync::mpsc;
5
6use crate::mcp_elicit::{install_bus, ElicitationBus};
7use crate::mcp_progress::{
8    active_bus as active_progress_bus, install_active_bus as install_active_progress_bus,
9    is_valid_progress_token, scope_context, ProgressBus, ProgressContext,
10};
11use crate::stdlib::json_to_vm_value;
12use crate::value::VmError;
13use crate::vm::Vm;
14
15use super::convert::{prompt_value_to_messages, vm_value_to_content, vm_value_to_json};
16use super::defs::{
17    McpCompletionSource, McpPromptDef, McpResourceDef, McpResourceTemplateDef, McpToolDef,
18};
19use super::uri::{match_uri_template, uri_template_variables};
20use super::PROTOCOL_VERSION;
21
22/// MCP server that exposes Harn tools, resources, and prompts over MCP JSON-RPC.
23pub struct McpServer {
24    server_name: String,
25    server_version: String,
26    tools: Vec<McpToolDef>,
27    resources: Vec<McpResourceDef>,
28    resource_templates: Vec<McpResourceTemplateDef>,
29    prompts: Vec<McpPromptDef>,
30    log_level: RefCell<String>,
31    /// Optional Server Card payload — advertised in the `initialize`
32    /// response's `serverInfo.card` field and exposed as a static
33    /// resource at the well-known URI `well-known://mcp-card`.
34    /// Populated by `harn serve mcp --card path/to/card.json`.
35    server_card: Option<serde_json::Value>,
36}
37
38impl McpServer {
39    pub fn new(
40        server_name: String,
41        tools: Vec<McpToolDef>,
42        resources: Vec<McpResourceDef>,
43        resource_templates: Vec<McpResourceTemplateDef>,
44        prompts: Vec<McpPromptDef>,
45    ) -> Self {
46        Self {
47            server_name,
48            server_version: env!("CARGO_PKG_VERSION").to_string(),
49            tools,
50            resources,
51            resource_templates,
52            prompts,
53            log_level: RefCell::new("warning".to_string()),
54            server_card: None,
55        }
56    }
57
58    /// Attach a Server Card to be advertised over `initialize` and via
59    /// the `well-known://mcp-card` resource. Call on a freshly-built
60    /// `McpServer` before `run`.
61    pub fn with_server_card(mut self, card: serde_json::Value) -> Self {
62        self.server_card = Some(card);
63        self
64    }
65
66    /// Run the MCP server loop, reading JSON-RPC from stdin and writing
67    /// to stdout. The transport is split across three concurrent halves:
68    /// a stdin reader that demuxes responses to in-flight elicitation
69    /// requests away from new client requests, a stdout writer task
70    /// that drains the outbound queue, and the main dispatch loop that
71    /// owns the VM and processes new requests one at a time. This shape
72    /// is what allows a tool handler to call `mcp_elicit(...)` mid-flight
73    /// — the handler awaits an inbound response while the writer keeps
74    /// emitting the elicitation request.
75    pub async fn run(&self, vm: &mut Vm) -> Result<(), VmError> {
76        let (out_tx, mut out_rx) = mpsc::unbounded_channel::<serde_json::Value>();
77        let (in_tx, mut in_rx) = mpsc::unbounded_channel::<serde_json::Value>();
78        let bus = ElicitationBus::new(out_tx.clone());
79        let progress_bus = ProgressBus::from_mpsc(out_tx.clone());
80
81        let bus_for_reader = bus.clone();
82        let in_tx_reader = in_tx.clone();
83        let reader = tokio::spawn(async move {
84            let stdin = BufReader::new(tokio::io::stdin());
85            let mut lines = stdin.lines();
86            while let Ok(Some(line)) = lines.next_line().await {
87                let trimmed = line.trim();
88                if trimmed.is_empty() {
89                    continue;
90                }
91                let msg: serde_json::Value = match serde_json::from_str(trimmed) {
92                    Ok(v) => v,
93                    Err(_) => continue,
94                };
95                // Per JSON-RPC, responses (no `method`, has `id` + result/error)
96                // must not themselves be replied to. Route to the
97                // elicitation bus when we recognize the id; otherwise
98                // silently drop instead of letting the dispatcher
99                // bounce a "Method not found" back to the client.
100                if msg.get("method").is_none() {
101                    let _ = bus_for_reader.route_response(&msg);
102                    continue;
103                }
104                if in_tx_reader.send(msg).is_err() {
105                    break;
106                }
107            }
108        });
109        // Drop the inbound sender held by the spawn closure's parent
110        // scope so closing of stdin actually terminates the dispatcher.
111        drop(in_tx);
112
113        let writer = tokio::spawn(async move {
114            let mut stdout = tokio::io::stdout();
115            while let Some(msg) = out_rx.recv().await {
116                let mut line = match serde_json::to_string(&msg) {
117                    Ok(value) => value,
118                    Err(_) => continue,
119                };
120                line.push('\n');
121                if stdout.write_all(line.as_bytes()).await.is_err() {
122                    break;
123                }
124                if stdout.flush().await.is_err() {
125                    break;
126                }
127            }
128        });
129
130        // Make the bus visible to tool handlers running on this thread.
131        let previous_bus = install_bus(Some(bus));
132        let previous_progress = install_active_progress_bus(Some(progress_bus));
133
134        while let Some(msg) = in_rx.recv().await {
135            if let Some(response) = self.handle_json_rpc(msg, vm).await {
136                if out_tx.send(response).is_err() {
137                    break;
138                }
139            }
140        }
141
142        // Closing out_tx tells the writer task to drain and exit.
143        // Restoring the previous bus (rather than always wiping)
144        // keeps nested usages well-defined if they ever appear.
145        drop(out_tx);
146        install_bus(previous_bus);
147        install_active_progress_bus(previous_progress);
148
149        // Best-effort wait so the writer flushes any tail responses
150        // before the function returns. Reader task exits naturally
151        // when stdin closes.
152        let _ = writer.await;
153        reader.abort();
154        Ok(())
155    }
156
157    /// Handle one MCP JSON-RPC message. Notifications return `None`.
158    pub async fn handle_json_rpc(
159        &self,
160        msg: serde_json::Value,
161        vm: &mut Vm,
162    ) -> Option<serde_json::Value> {
163        let method = msg.get("method").and_then(|m| m.as_str()).unwrap_or("");
164        let id = msg.get("id").cloned()?;
165        let params = msg.get("params").cloned().unwrap_or(serde_json::json!({}));
166
167        if let Some(response) =
168            crate::mcp_protocol::unsupported_client_bound_method_response(id.clone(), method)
169        {
170            return Some(response);
171        }
172
173        Some(match method {
174            "initialize" => self.handle_initialize(&id),
175            "ping" => crate::jsonrpc::response(id.clone(), serde_json::json!({})),
176            "logging/setLevel" => self.handle_logging_set_level(&id, &params),
177            "harn.hitl.respond" => self.handle_hitl_respond(&id, &params).await,
178            "tools/list" => self.handle_tools_list(&id, &params),
179            "tools/call" => self.handle_tools_call(&id, &params, vm).await,
180            crate::mcp_protocol::METHOD_TASKS_GET => self.handle_task_lookup(&id, &params),
181            crate::mcp_protocol::METHOD_TASKS_RESULT => self.handle_task_lookup(&id, &params),
182            crate::mcp_protocol::METHOD_TASKS_LIST => self.handle_tasks_list(&id, &params),
183            crate::mcp_protocol::METHOD_TASKS_CANCEL => self.handle_task_lookup(&id, &params),
184            "resources/list" => self.handle_resources_list(&id, &params),
185            "resources/read" => self.handle_resources_read(&id, &params, vm).await,
186            "resources/subscribe" => self.handle_resources_subscribe(&id, &params),
187            "resources/unsubscribe" => self.handle_resources_unsubscribe(&id, &params),
188            "resources/templates/list" => self.handle_resource_templates_list(&id, &params),
189            "prompts/list" => self.handle_prompts_list(&id, &params),
190            "prompts/get" => self.handle_prompts_get(&id, &params, vm).await,
191            crate::mcp_protocol::METHOD_COMPLETION_COMPLETE => {
192                self.handle_completion_complete(&id, &params, vm).await
193            }
194            _ if crate::mcp_protocol::unsupported_latest_spec_method(method).is_some() => {
195                crate::mcp_protocol::unsupported_latest_spec_method_response(id.clone(), method)
196                    .expect("checked unsupported MCP method")
197            }
198            _ => serde_json::json!({
199                "jsonrpc": "2.0",
200                "id": id,
201                "error": {
202                    "code": -32601,
203                    "message": format!("Method not found: {method}")
204                }
205            }),
206        })
207    }
208
209    fn handle_initialize(&self, id: &serde_json::Value) -> serde_json::Value {
210        let mut capabilities = serde_json::Map::new();
211        if !self.tools.is_empty() {
212            capabilities.insert("tools".into(), serde_json::json!({ "listChanged": true }));
213        }
214        if !self.resources.is_empty()
215            || !self.resource_templates.is_empty()
216            || self.server_card.is_some()
217        {
218            capabilities.insert(
219                "resources".into(),
220                serde_json::json!({ "listChanged": true, "subscribe": true }),
221            );
222        }
223        if !self.prompts.is_empty() {
224            capabilities.insert("prompts".into(), serde_json::json!({ "listChanged": true }));
225        }
226        capabilities.insert("logging".into(), serde_json::json!({}));
227        capabilities.insert("tasks".into(), crate::mcp_protocol::tasks_capability());
228        capabilities.insert(
229            "completions".into(),
230            crate::mcp_protocol::completions_capability(),
231        );
232        // Always advertise elicitation: any registered tool may decide
233        // at runtime whether to call `mcp_elicit(...)`, so capability
234        // negotiation can't be tied to a static check.
235        capabilities.insert("elicitation".into(), serde_json::json!({}));
236
237        let mut server_info = serde_json::json!({
238            "name": self.server_name,
239            "version": self.server_version
240        });
241        if let Some(ref card) = self.server_card {
242            server_info["card"] = card.clone();
243        }
244
245        serde_json::json!({
246            "jsonrpc": "2.0",
247            "id": id,
248            "result": {
249                "protocolVersion": PROTOCOL_VERSION,
250                "capabilities": capabilities,
251                "serverInfo": server_info
252            }
253        })
254    }
255
256    fn handle_tools_list(
257        &self,
258        id: &serde_json::Value,
259        params: &serde_json::Value,
260    ) -> serde_json::Value {
261        let page = match crate::mcp_protocol::mcp_list_page(params, self.tools.len(), "tools/list")
262        {
263            Ok(page) => page,
264            Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
265        };
266        let tools: Vec<serde_json::Value> = self.tools[page.start..page.end]
267            .iter()
268            .map(|t| {
269                let mut entry = serde_json::json!({
270                    "name": t.name,
271                    "description": t.description,
272                    "inputSchema": t.input_schema,
273                });
274                if let Some(ref title) = t.title {
275                    entry["title"] = serde_json::json!(title);
276                }
277                if let Some(ref output_schema) = t.output_schema {
278                    entry["outputSchema"] = output_schema.clone();
279                }
280                if let Some(ref annotations) = t.annotations {
281                    entry["annotations"] = annotations.clone();
282                }
283                if let Some(ref icons) = t.icons {
284                    entry["icons"] = icons.clone();
285                }
286                entry["execution"] = crate::mcp_protocol::tool_execution(
287                    crate::mcp_protocol::McpToolTaskSupport::Forbidden,
288                );
289                entry
290            })
291            .collect();
292
293        let mut result = serde_json::json!({ "tools": tools });
294        if let Some(next_cursor) = page.next_cursor {
295            result["nextCursor"] = serde_json::json!(next_cursor);
296        }
297
298        serde_json::json!({
299            "jsonrpc": "2.0",
300            "id": id,
301            "result": result
302        })
303    }
304
305    async fn handle_tools_call(
306        &self,
307        id: &serde_json::Value,
308        params: &serde_json::Value,
309        vm: &mut Vm,
310    ) -> serde_json::Value {
311        let tool_name = params.get("name").and_then(|n| n.as_str()).unwrap_or("");
312        if crate::mcp_protocol::requests_task_augmentation(params) {
313            return crate::mcp_protocol::task_augmentation_error_response(
314                id.clone(),
315                "tools/call",
316                -32602,
317                "Tool does not support MCP task-augmented execution",
318                "This Harn MCP server executes registered Harn closures inline, so every tool advertises execution.taskSupport=\"forbidden\".",
319            );
320        }
321
322        let tool = match self.tools.iter().find(|t| t.name == tool_name) {
323            Some(t) => t,
324            None => {
325                return serde_json::json!({
326                    "jsonrpc": "2.0",
327                    "id": id,
328                    "error": { "code": -32602, "message": format!("Unknown tool: {tool_name}") }
329                });
330            }
331        };
332
333        let arguments = params
334            .get("arguments")
335            .cloned()
336            .unwrap_or(serde_json::json!({}));
337        let args_vm = json_to_vm_value(&arguments);
338
339        // Bind a per-call progress context so the handler (and any
340        // helpers it calls) can emit `notifications/progress` via
341        // `mcp_report_progress(...)`. The context is wired only when
342        // both the connection has a progress bus installed AND the
343        // client opted in via `_meta.progressToken`. Using
344        // `scope_context` (a tokio task-local) rather than a
345        // thread-local guard keeps concurrent tool calls isolated even
346        // when they share an OS thread via a `LocalSet`.
347        let progress_token = params
348            .pointer("/_meta/progressToken")
349            .cloned()
350            .filter(is_valid_progress_token);
351        let progress_ctx = progress_token
352            .and_then(|token| active_progress_bus().map(|bus| ProgressContext::new(bus, token)));
353
354        let result =
355            scope_context(progress_ctx, vm.call_closure_pub(&tool.handler, &[args_vm])).await;
356
357        match result {
358            Ok(value) => {
359                let content = vm_value_to_content(&value);
360                let mut call_result = serde_json::json!({
361                    "content": content,
362                    "isError": false
363                });
364                if tool.output_schema.is_some() {
365                    let text = value.display();
366                    let structured = match serde_json::from_str::<serde_json::Value>(&text) {
367                        Ok(v) => v,
368                        _ => serde_json::json!(text),
369                    };
370                    call_result["structuredContent"] = structured;
371                }
372                serde_json::json!({
373                    "jsonrpc": "2.0",
374                    "id": id,
375                    "result": call_result
376                })
377            }
378            Err(e) => serde_json::json!({
379                "jsonrpc": "2.0",
380                "id": id,
381                "result": {
382                    "content": [{ "type": "text", "text": format!("{e}") }],
383                    "isError": true
384                }
385            }),
386        }
387    }
388
389    fn handle_task_lookup(
390        &self,
391        id: &serde_json::Value,
392        params: &serde_json::Value,
393    ) -> serde_json::Value {
394        let task_id = params
395            .get("taskId")
396            .and_then(|value| value.as_str())
397            .unwrap_or_default();
398        serde_json::json!({
399            "jsonrpc": "2.0",
400            "id": id,
401            "error": {
402                "code": -32602,
403                "message": format!("Failed to retrieve task: task not found '{task_id}'")
404            }
405        })
406    }
407
408    fn handle_tasks_list(
409        &self,
410        id: &serde_json::Value,
411        _params: &serde_json::Value,
412    ) -> serde_json::Value {
413        serde_json::json!({
414            "jsonrpc": "2.0",
415            "id": id,
416            "result": { "tasks": [] }
417        })
418    }
419
420    async fn handle_hitl_respond(
421        &self,
422        id: &serde_json::Value,
423        params: &serde_json::Value,
424    ) -> serde_json::Value {
425        let response: crate::stdlib::hitl::HitlHostResponse =
426            match serde_json::from_value(params.clone()) {
427                Ok(response) => response,
428                Err(error) => {
429                    return serde_json::json!({
430                        "jsonrpc": "2.0",
431                        "id": id,
432                        "error": {
433                            "code": -32602,
434                            "message": format!("invalid harn.hitl.respond params: {error}"),
435                        }
436                    });
437                }
438            };
439        let cwd = std::env::current_dir().ok();
440        match crate::stdlib::hitl::append_hitl_response(cwd.as_deref(), response).await {
441            Ok(_) => serde_json::json!({
442                "jsonrpc": "2.0",
443                "id": id,
444                "result": { "ok": true }
445            }),
446            Err(error) => serde_json::json!({
447                "jsonrpc": "2.0",
448                "id": id,
449                "error": {
450                    "code": -32000,
451                    "message": error
452                }
453            }),
454        }
455    }
456
457    fn handle_resources_list(
458        &self,
459        id: &serde_json::Value,
460        params: &serde_json::Value,
461    ) -> serde_json::Value {
462        let mut all_resources = Vec::with_capacity(self.resources.len() + 1);
463        if self.server_card.is_some() {
464            all_resources.push(serde_json::json!({
465                "uri": "well-known://mcp-card",
466                "name": "Server Card",
467                "description": "MCP v2.1 Server Card advertising this server's identity and capabilities",
468                "mimeType": "application/json",
469            }));
470        }
471        all_resources.extend(self.resources.iter().map(|r| {
472            let mut entry = serde_json::json!({ "uri": r.uri, "name": r.name });
473            if let Some(ref title) = r.title {
474                entry["title"] = serde_json::json!(title);
475            }
476            if let Some(ref desc) = r.description {
477                entry["description"] = serde_json::json!(desc);
478            }
479            if let Some(ref mime) = r.mime_type {
480                entry["mimeType"] = serde_json::json!(mime);
481            }
482            entry
483        }));
484
485        let page =
486            match crate::mcp_protocol::mcp_list_page(params, all_resources.len(), "resources/list")
487            {
488                Ok(page) => page,
489                Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
490            };
491        let resources = all_resources[page.start..page.end].to_vec();
492
493        let mut result = serde_json::json!({ "resources": resources });
494        if let Some(next_cursor) = page.next_cursor {
495            result["nextCursor"] = serde_json::json!(next_cursor);
496        }
497
498        serde_json::json!({
499            "jsonrpc": "2.0",
500            "id": id,
501            "result": result
502        })
503    }
504
505    async fn handle_resources_read(
506        &self,
507        id: &serde_json::Value,
508        params: &serde_json::Value,
509        vm: &mut Vm,
510    ) -> serde_json::Value {
511        let uri = params.get("uri").and_then(|u| u.as_str()).unwrap_or("");
512
513        // Expose the Server Card at the well-known URI. Matches the
514        // HTTP convention (.well-known/mcp-card) but routed through
515        // the stdio resource protocol.
516        if uri == "well-known://mcp-card" {
517            if let Some(ref card) = self.server_card {
518                let content = serde_json::json!({
519                    "uri": uri,
520                    "text": serde_json::to_string(card).unwrap_or_else(|_| "{}".to_string()),
521                    "mimeType": "application/json",
522                });
523                return serde_json::json!({
524                    "jsonrpc": "2.0",
525                    "id": id,
526                    "result": { "contents": [content] }
527                });
528            }
529        }
530
531        // Static resources take precedence over templates.
532        if let Some(resource) = self.resources.iter().find(|r| r.uri == uri) {
533            let mut content = serde_json::json!({ "uri": resource.uri, "text": resource.text });
534            if let Some(ref mime) = resource.mime_type {
535                content["mimeType"] = serde_json::json!(mime);
536            }
537            return serde_json::json!({
538                "jsonrpc": "2.0",
539                "id": id,
540                "result": { "contents": [content] }
541            });
542        }
543
544        for tmpl in &self.resource_templates {
545            if let Some(args) = match_uri_template(&tmpl.uri_template, uri) {
546                let args_vm = json_to_vm_value(&serde_json::json!(args));
547                let result = vm.call_closure_pub(&tmpl.handler, &[args_vm]).await;
548                return match result {
549                    Ok(value) => {
550                        let mut content = serde_json::json!({
551                            "uri": uri,
552                            "text": value.display(),
553                        });
554                        if let Some(ref mime) = tmpl.mime_type {
555                            content["mimeType"] = serde_json::json!(mime);
556                        }
557                        serde_json::json!({
558                            "jsonrpc": "2.0",
559                            "id": id,
560                            "result": { "contents": [content] }
561                        })
562                    }
563                    Err(e) => serde_json::json!({
564                        "jsonrpc": "2.0",
565                        "id": id,
566                        "error": { "code": -32603, "message": format!("{e}") }
567                    }),
568                };
569            }
570        }
571
572        serde_json::json!({
573            "jsonrpc": "2.0",
574            "id": id,
575            "error": { "code": -32002, "message": format!("Resource not found: {uri}") }
576        })
577    }
578
579    fn handle_resources_subscribe(
580        &self,
581        id: &serde_json::Value,
582        params: &serde_json::Value,
583    ) -> serde_json::Value {
584        let uri = params.get("uri").and_then(|u| u.as_str()).unwrap_or("");
585        if !self.resource_uri_exists(uri) {
586            return serde_json::json!({
587                "jsonrpc": "2.0",
588                "id": id,
589                "error": { "code": -32002, "message": format!("Resource not found: {uri}") }
590            });
591        }
592        crate::jsonrpc::response(id.clone(), serde_json::json!({}))
593    }
594
595    fn handle_resources_unsubscribe(
596        &self,
597        id: &serde_json::Value,
598        _params: &serde_json::Value,
599    ) -> serde_json::Value {
600        crate::jsonrpc::response(id.clone(), serde_json::json!({}))
601    }
602
603    fn resource_uri_exists(&self, uri: &str) -> bool {
604        if uri == "well-known://mcp-card" {
605            return self.server_card.is_some();
606        }
607        self.resources.iter().any(|resource| resource.uri == uri)
608            || self
609                .resource_templates
610                .iter()
611                .any(|template| match_uri_template(&template.uri_template, uri).is_some())
612    }
613
614    fn handle_resource_templates_list(
615        &self,
616        id: &serde_json::Value,
617        params: &serde_json::Value,
618    ) -> serde_json::Value {
619        let page = match crate::mcp_protocol::mcp_list_page(
620            params,
621            self.resource_templates.len(),
622            "resources/templates/list",
623        ) {
624            Ok(page) => page,
625            Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
626        };
627        let templates: Vec<serde_json::Value> = self.resource_templates[page.start..page.end]
628            .iter()
629            .map(|t| {
630                let mut entry =
631                    serde_json::json!({ "uriTemplate": t.uri_template, "name": t.name });
632                if let Some(ref title) = t.title {
633                    entry["title"] = serde_json::json!(title);
634                }
635                if let Some(ref desc) = t.description {
636                    entry["description"] = serde_json::json!(desc);
637                }
638                if let Some(ref mime) = t.mime_type {
639                    entry["mimeType"] = serde_json::json!(mime);
640                }
641                entry
642            })
643            .collect();
644
645        let mut result = serde_json::json!({ "resourceTemplates": templates });
646        if let Some(next_cursor) = page.next_cursor {
647            result["nextCursor"] = serde_json::json!(next_cursor);
648        }
649
650        serde_json::json!({
651            "jsonrpc": "2.0",
652            "id": id,
653            "result": result
654        })
655    }
656
657    fn handle_prompts_list(
658        &self,
659        id: &serde_json::Value,
660        params: &serde_json::Value,
661    ) -> serde_json::Value {
662        let page =
663            match crate::mcp_protocol::mcp_list_page(params, self.prompts.len(), "prompts/list") {
664                Ok(page) => page,
665                Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
666            };
667        let prompts: Vec<serde_json::Value> = self.prompts[page.start..page.end]
668            .iter()
669            .map(|p| {
670                let mut entry = serde_json::json!({ "name": p.name });
671                if let Some(ref title) = p.title {
672                    entry["title"] = serde_json::json!(title);
673                }
674                if let Some(ref desc) = p.description {
675                    entry["description"] = serde_json::json!(desc);
676                }
677                if let Some(ref args) = p.arguments {
678                    let args_json: Vec<serde_json::Value> = args
679                        .iter()
680                        .map(|a| {
681                            let mut arg =
682                                serde_json::json!({ "name": a.name, "required": a.required });
683                            if let Some(ref desc) = a.description {
684                                arg["description"] = serde_json::json!(desc);
685                            }
686                            arg
687                        })
688                        .collect();
689                    entry["arguments"] = serde_json::json!(args_json);
690                }
691                entry
692            })
693            .collect();
694
695        let mut result = serde_json::json!({ "prompts": prompts });
696        if let Some(next_cursor) = page.next_cursor {
697            result["nextCursor"] = serde_json::json!(next_cursor);
698        }
699
700        serde_json::json!({
701            "jsonrpc": "2.0",
702            "id": id,
703            "result": result
704        })
705    }
706
707    fn handle_logging_set_level(
708        &self,
709        id: &serde_json::Value,
710        params: &serde_json::Value,
711    ) -> serde_json::Value {
712        let level = params
713            .get("level")
714            .and_then(|l| l.as_str())
715            .unwrap_or("warning");
716        *self.log_level.borrow_mut() = level.to_string();
717        crate::jsonrpc::response(id.clone(), serde_json::json!({}))
718    }
719
720    async fn handle_prompts_get(
721        &self,
722        id: &serde_json::Value,
723        params: &serde_json::Value,
724        vm: &mut Vm,
725    ) -> serde_json::Value {
726        let name = params.get("name").and_then(|n| n.as_str()).unwrap_or("");
727
728        let prompt = match self.prompts.iter().find(|p| p.name == name) {
729            Some(p) => p,
730            None => {
731                return serde_json::json!({
732                    "jsonrpc": "2.0",
733                    "id": id,
734                    "error": { "code": -32602, "message": format!("Unknown prompt: {name}") }
735                });
736            }
737        };
738
739        let arguments = params
740            .get("arguments")
741            .cloned()
742            .unwrap_or(serde_json::json!({}));
743        let args_vm = json_to_vm_value(&arguments);
744
745        let result = vm.call_closure_pub(&prompt.handler, &[args_vm]).await;
746
747        match result {
748            Ok(value) => {
749                let messages = prompt_value_to_messages(&value);
750                serde_json::json!({
751                    "jsonrpc": "2.0",
752                    "id": id,
753                    "result": { "messages": messages }
754                })
755            }
756            Err(e) => serde_json::json!({
757                "jsonrpc": "2.0",
758                "id": id,
759                "error": { "code": -32603, "message": format!("{e}") }
760            }),
761        }
762    }
763
764    async fn handle_completion_complete(
765        &self,
766        id: &serde_json::Value,
767        params: &serde_json::Value,
768        vm: &mut Vm,
769    ) -> serde_json::Value {
770        let Some(ref_type) = params.pointer("/ref/type").and_then(|value| value.as_str()) else {
771            return crate::jsonrpc::error_response(
772                id.clone(),
773                -32602,
774                "completion ref.type is required",
775            );
776        };
777        match ref_type {
778            "ref/prompt" => self.complete_prompt_argument(id, params, vm).await,
779            "ref/resource" => {
780                self.complete_resource_template_argument(id, params, vm)
781                    .await
782            }
783            other => crate::jsonrpc::error_response(
784                id.clone(),
785                -32602,
786                &format!("Unsupported completion ref.type: {other}"),
787            ),
788        }
789    }
790
791    async fn complete_prompt_argument(
792        &self,
793        id: &serde_json::Value,
794        params: &serde_json::Value,
795        vm: &mut Vm,
796    ) -> serde_json::Value {
797        let name = params
798            .pointer("/ref/name")
799            .and_then(|value| value.as_str())
800            .unwrap_or("");
801        let argument_name = match completion_argument_name(params) {
802            Ok(name) => name,
803            Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
804        };
805        let value = completion_argument_value(params);
806        let prompt = match self.prompts.iter().find(|prompt| prompt.name == name) {
807            Some(prompt) => prompt,
808            None => {
809                return crate::jsonrpc::error_response(
810                    id.clone(),
811                    -32602,
812                    &format!("Unknown prompt: {name}"),
813                );
814            }
815        };
816        let Some(argument) = prompt
817            .arguments
818            .as_deref()
819            .unwrap_or_default()
820            .iter()
821            .find(|argument| argument.name == argument_name)
822        else {
823            return crate::jsonrpc::error_response(
824                id.clone(),
825                -32602,
826                &format!("Unknown prompt argument: {argument_name}"),
827            );
828        };
829        let candidates =
830            match completion_source_candidates(argument.completion.as_ref(), params, vm).await {
831                Ok(candidates) => candidates,
832                Err(error) => return crate::jsonrpc::error_response(id.clone(), -32603, &error),
833            };
834        crate::mcp_protocol::completion_result(id.clone(), candidates, value)
835    }
836
837    async fn complete_resource_template_argument(
838        &self,
839        id: &serde_json::Value,
840        params: &serde_json::Value,
841        vm: &mut Vm,
842    ) -> serde_json::Value {
843        let uri = params
844            .pointer("/ref/uri")
845            .and_then(|value| value.as_str())
846            .unwrap_or("");
847        let argument_name = match completion_argument_name(params) {
848            Ok(name) => name,
849            Err(error) => return crate::jsonrpc::error_response(id.clone(), -32602, &error),
850        };
851        let value = completion_argument_value(params);
852        let template = match self
853            .resource_templates
854            .iter()
855            .find(|template| template.uri_template == uri)
856        {
857            Some(template) => template,
858            None => {
859                return crate::jsonrpc::error_response(
860                    id.clone(),
861                    -32602,
862                    &format!("Unknown resource template: {uri}"),
863                );
864            }
865        };
866        if !uri_template_variables(&template.uri_template)
867            .iter()
868            .any(|name| name == argument_name)
869        {
870            return crate::jsonrpc::error_response(
871                id.clone(),
872                -32602,
873                &format!("Unknown resource template argument: {argument_name}"),
874            );
875        }
876        let candidates =
877            match completion_source_candidates(template.completions.get(argument_name), params, vm)
878                .await
879            {
880                Ok(candidates) => candidates,
881                Err(error) => return crate::jsonrpc::error_response(id.clone(), -32603, &error),
882            };
883        crate::mcp_protocol::completion_result(id.clone(), candidates, value)
884    }
885}
886
887fn completion_argument_name(params: &serde_json::Value) -> Result<&str, String> {
888    params
889        .pointer("/argument/name")
890        .and_then(|value| value.as_str())
891        .filter(|value| !value.is_empty())
892        .ok_or_else(|| "completion argument.name is required".to_string())
893}
894
895fn completion_argument_value(params: &serde_json::Value) -> &str {
896    params
897        .pointer("/argument/value")
898        .and_then(|value| value.as_str())
899        .unwrap_or_default()
900}
901
902async fn completion_source_candidates(
903    source: Option<&McpCompletionSource>,
904    params: &serde_json::Value,
905    vm: &mut Vm,
906) -> Result<Vec<String>, String> {
907    let Some(source) = source else {
908        return Ok(Vec::new());
909    };
910    let mut candidates = source.values.clone();
911    if let Some(handler) = source.handler.as_ref() {
912        let request = json_to_vm_value(params);
913        let value = vm
914            .call_closure_pub(handler, &[request])
915            .await
916            .map_err(|error| format!("{error}"))?;
917        candidates.extend(completion_candidates_from_json(&vm_value_to_json(&value)));
918    }
919    Ok(candidates)
920}
921
922fn completion_candidates_from_json(value: &serde_json::Value) -> Vec<String> {
923    match value {
924        serde_json::Value::Array(items) => {
925            items.iter().filter_map(json_completion_string).collect()
926        }
927        serde_json::Value::Object(map) => map
928            .get("values")
929            .or_else(|| map.get("completion").and_then(|value| value.get("values")))
930            .map(completion_candidates_from_json)
931            .unwrap_or_default(),
932        _ => json_completion_string(value).into_iter().collect(),
933    }
934}
935
936fn json_completion_string(value: &serde_json::Value) -> Option<String> {
937    match value {
938        serde_json::Value::String(value) => Some(value.clone()),
939        serde_json::Value::Number(_) | serde_json::Value::Bool(_) => Some(value.to_string()),
940        _ => None,
941    }
942}