Skip to main content

myko_server/mcp/
dispatch.rs

1//! Transport-agnostic MCP JSON-RPC dispatch.
2//!
3//! Handles `initialize`, `tools/list`, `tools/call`, `resources/list`,
4//! `resources/read`, and the relevant notifications.
5//!
6//! ## Tool / resource split
7//!
8//! - **Tools** — every registered query / view / report / command is
9//!   exposed as a tool the LLM can invoke on demand. Tools take structured
10//!   `arguments` matching the registration's input shape.
11//! - **Resources** — every tool also surfaces a *schema* resource at
12//!   `myko://schema/<kind>/<id>` whose content is the JSON Schema for the
13//!   tool's input. The schema goes through the resource shape rather than
14//!   the data because:
15//!   - Resources are URI-keyed and can't carry structured arguments, but
16//!     every query / view / report registration takes args.
17//!   - Even argument-less reads are backed by reactive cells (the data is
18//!     live), so pre-loading a snapshot into context at startup would
19//!     just go stale. On-demand `tools/call` is the right shape for live
20//!     reads.
21//!
22//! Reactive query subscriptions via `resources/subscribe` are future work.
23//!
24//! Error responses follow the [MCP 2025-06-18 error-handling shape][spec]:
25//!
26//! - **Protocol Error** — JSON-RPC error response with `code: -32602` and
27//!   message `"Unknown tool: …"`. Used when a tool is hidden by visibility
28//!   filtering (indistinguishable on the wire from a tool that does not
29//!   exist) or when required `tools/call` params are missing.
30//! - **Tool Execution Error** — successful JSON-RPC response with
31//!   `isError: true` content carrying a descriptive message. Used when
32//!   `tools/call` arguments fail client-supplied argument constraints
33//!   (the spec's "Invalid input data" category) or when tool execution
34//!   raises an error downstream.
35//!
36//! [spec]: https://modelcontextprotocol.io/specification/2025-06-18/server/tools#error-handling
37
38use myko::{
39    command::CommandRegistration, query::QueryRegistration, report::ReportRegistration,
40    view::ViewRegistration,
41};
42use serde_json::{Value, json};
43
44use super::{
45    exec::Executor,
46    filter::ClientFilters,
47    types::{McpError, McpRequest, McpResource, McpResponse, McpTool},
48};
49
50const CONNECTION_STATUS_TOOL: &str = "connection_status";
51
52/// Server identity for the `initialize` response.
53#[derive(Debug, Clone)]
54pub struct ServerInfo {
55    pub name: String,
56    pub version: String,
57}
58
59impl Default for ServerInfo {
60    fn default() -> Self {
61        Self {
62            name: "myko-mcp".to_string(),
63            version: env!("CARGO_PKG_VERSION").to_string(),
64        }
65    }
66}
67
68/// Dispatch one JSON-RPC request. Returns `None` for notifications that do
69/// not produce a response.
70pub async fn handle_request(
71    request: McpRequest,
72    filter: &ClientFilters,
73    executor: &Executor,
74    info: &ServerInfo,
75) -> Option<McpResponse> {
76    match request.method.as_str() {
77        "initialize" => Some(handle_initialize(request.id, info)),
78        "notifications/initialized" | "notifications/cancelled" => None,
79        "tools/list" => Some(handle_tools_list(request.id, filter)),
80        "tools/call" => Some(handle_tools_call(request.id, request.params, filter, executor).await),
81        "resources/list" => Some(handle_resources_list(request.id, filter)),
82        "resources/read" => Some(handle_resources_read(request.id, request.params, filter)),
83        _ => Some(McpResponse::error(
84            request.id,
85            McpError::method_not_found(&request.method),
86        )),
87    }
88}
89
90fn handle_initialize(id: Value, info: &ServerInfo) -> McpResponse {
91    McpResponse::success(
92        id,
93        json!({
94            "protocolVersion": "2024-11-05",
95            "capabilities": {
96                "tools": {},
97                "resources": {}
98            },
99            "serverInfo": {
100                "name": info.name,
101                "version": info.version,
102            }
103        }),
104    )
105}
106
107fn handle_tools_list(id: Value, filter: &ClientFilters) -> McpResponse {
108    let mut tools: Vec<McpTool> = Vec::new();
109
110    if filter.tool_visible(CONNECTION_STATUS_TOOL) {
111        tools.push(McpTool {
112            name: CONNECTION_STATUS_TOOL.to_string(),
113            description: "Check the connection status to the Myko server".to_string(),
114            input_schema: json!({
115                "type": "object",
116                "properties": {},
117                "required": []
118            }),
119        });
120    }
121
122    for reg in inventory::iter::<QueryRegistration> {
123        let name = format!("query:{}", reg.query_id);
124        if !filter.tool_visible(&name) {
125            continue;
126        }
127        tools.push(McpTool {
128            name,
129            description: format!("Query returning {} entities", reg.query_item_type),
130            input_schema: open_object_schema(),
131        });
132    }
133
134    for reg in inventory::iter::<ViewRegistration> {
135        let name = format!("view:{}", reg.view_id);
136        if !filter.tool_visible(&name) {
137            continue;
138        }
139        tools.push(McpTool {
140            name,
141            description: format!("View returning a list of {}", reg.view_item_type),
142            input_schema: open_object_schema(),
143        });
144    }
145
146    for reg in inventory::iter::<ReportRegistration> {
147        let name = format!("report:{}", reg.report_id);
148        if !filter.tool_visible(&name) {
149            continue;
150        }
151        tools.push(McpTool {
152            name,
153            description: format!("Report returning {}", reg.output_type),
154            input_schema: open_object_schema(),
155        });
156    }
157
158    for reg in inventory::iter::<CommandRegistration> {
159        let name = format!("command:{}", reg.command_id);
160        if !filter.tool_visible(&name) {
161            continue;
162        }
163        tools.push(McpTool {
164            name,
165            description: format!("Command returning {}", reg.result_type),
166            input_schema: open_object_schema(),
167        });
168    }
169
170    McpResponse::success(id, json!({ "tools": tools }))
171}
172
173async fn handle_tools_call(
174    id: Value,
175    params: Option<Value>,
176    filter: &ClientFilters,
177    executor: &Executor,
178) -> McpResponse {
179    let Some(params) = params else {
180        return McpResponse::error(id, McpError::invalid_params("Missing params"));
181    };
182    let Some(tool_name) = params
183        .get("name")
184        .and_then(|v| v.as_str())
185        .map(str::to_string)
186    else {
187        return McpResponse::error(id, McpError::invalid_params("Missing tool name"));
188    };
189
190    // MCP Protocol Error: a hidden tool is indistinguishable on the wire from
191    // a tool that doesn't exist. Code -32602 + "Unknown tool: …" matches the
192    // example in the MCP 2025-06-18 spec (Tools / Error Handling).
193    if !filter.tool_visible(&tool_name) {
194        return McpResponse::error(
195            id,
196            McpError {
197                code: McpError::INVALID_PARAMS,
198                message: format!("Unknown tool: {}", tool_name),
199                data: None,
200            },
201        );
202    }
203
204    let arguments = params
205        .get("arguments")
206        .cloned()
207        .unwrap_or_else(|| json!({}));
208
209    // MCP Tool Execution Error ("Invalid input data" category): result is a
210    // successful JSON-RPC response carrying `isError: true` content with the
211    // descriptive constraint message verbatim — distinct from the protocol
212    // error path above.
213    if let Err(message) = filter.tool_callable(&tool_name, &arguments) {
214        return McpResponse::success(
215            id,
216            json!({
217                "content": [{
218                    "type": "text",
219                    "text": message,
220                }],
221                "isError": true,
222            }),
223        );
224    }
225
226    let result = execute_tool(executor, &tool_name, arguments).await;
227
228    match result {
229        Ok(data) => McpResponse::success(
230            id,
231            json!({
232                "content": [{
233                    "type": "text",
234                    "text": serde_json::to_string_pretty(&data).unwrap_or_default()
235                }]
236            }),
237        ),
238        Err(message) => McpResponse::success(
239            id,
240            json!({
241                "content": [{
242                    "type": "text",
243                    "text": format!("Error: {}", message)
244                }],
245                "isError": true,
246            }),
247        ),
248    }
249}
250
251async fn execute_tool(executor: &Executor, tool_name: &str, args: Value) -> Result<Value, String> {
252    if tool_name == CONNECTION_STATUS_TOOL {
253        return Ok(executor.connection_status());
254    }
255    if let Some(id) = tool_name.strip_prefix("query:") {
256        return executor.execute_query(id, args).await;
257    }
258    if let Some(id) = tool_name.strip_prefix("view:") {
259        return executor.execute_view(id, args).await;
260    }
261    if let Some(id) = tool_name.strip_prefix("report:") {
262        return executor.execute_report(id, args).await;
263    }
264    if let Some(id) = tool_name.strip_prefix("command:") {
265        return executor.execute_command(id, args).await;
266    }
267    Err(format!("Unknown tool: {}", tool_name))
268}
269
270fn handle_resources_list(id: Value, filter: &ClientFilters) -> McpResponse {
271    let mut resources: Vec<McpResource> = Vec::new();
272
273    for reg in inventory::iter::<QueryRegistration> {
274        let tool_name = format!("query:{}", reg.query_id);
275        if !filter.tool_visible(&tool_name) {
276            continue;
277        }
278        resources.push(McpResource {
279            uri: format!("myko://schema/query/{}", reg.query_id),
280            name: reg.query_id.to_string(),
281            description: Some(format!("Query returning {} entities", reg.query_item_type)),
282            mime_type: Some("application/json".to_string()),
283        });
284    }
285
286    for reg in inventory::iter::<ViewRegistration> {
287        let tool_name = format!("view:{}", reg.view_id);
288        if !filter.tool_visible(&tool_name) {
289            continue;
290        }
291        resources.push(McpResource {
292            uri: format!("myko://schema/view/{}", reg.view_id),
293            name: reg.view_id.to_string(),
294            description: Some(format!("View returning a list of {}", reg.view_item_type)),
295            mime_type: Some("application/json".to_string()),
296        });
297    }
298
299    for reg in inventory::iter::<ReportRegistration> {
300        let tool_name = format!("report:{}", reg.report_id);
301        if !filter.tool_visible(&tool_name) {
302            continue;
303        }
304        resources.push(McpResource {
305            uri: format!("myko://schema/report/{}", reg.report_id),
306            name: reg.report_id.to_string(),
307            description: Some(format!("Report returning {}", reg.output_type)),
308            mime_type: Some("application/json".to_string()),
309        });
310    }
311
312    for reg in inventory::iter::<CommandRegistration> {
313        let tool_name = format!("command:{}", reg.command_id);
314        if !filter.tool_visible(&tool_name) {
315            continue;
316        }
317        resources.push(McpResource {
318            uri: format!("myko://schema/command/{}", reg.command_id),
319            name: format!("{} (command)", reg.command_id),
320            description: Some(format!("Command returning {}", reg.result_type)),
321            mime_type: Some("application/json".to_string()),
322        });
323    }
324
325    McpResponse::success(id, json!({ "resources": resources }))
326}
327
328fn handle_resources_read(id: Value, params: Option<Value>, filter: &ClientFilters) -> McpResponse {
329    let Some(params) = params else {
330        return McpResponse::error(id, McpError::invalid_params("Missing params"));
331    };
332    let Some(uri) = params.get("uri").and_then(|v| v.as_str()) else {
333        return McpResponse::error(id, McpError::invalid_params("Missing uri"));
334    };
335
336    if let Some(path) = uri.strip_prefix("myko://schema/") {
337        let parts: Vec<&str> = path.splitn(2, '/').collect();
338        if parts.len() == 2 {
339            let (schema_type, schema_id) = (parts[0], parts[1]);
340            let tool_name = format!("{}:{}", schema_type, schema_id);
341            if !filter.tool_visible(&tool_name) {
342                return McpResponse::error(
343                    id,
344                    McpError {
345                        code: McpError::INVALID_PARAMS,
346                        message: format!("Resource not accessible: {}", uri),
347                        data: None,
348                    },
349                );
350            }
351            let content = match schema_type {
352                "query" => get_query_schema(schema_id),
353                "view" => get_view_schema(schema_id),
354                "report" => get_report_schema(schema_id),
355                "command" => get_command_schema(schema_id),
356                _ => None,
357            };
358            if let Some(content) = content {
359                return McpResponse::success(
360                    id,
361                    json!({
362                        "contents": [{
363                            "uri": uri,
364                            "mimeType": "application/json",
365                            "text": content,
366                        }]
367                    }),
368                );
369            }
370        }
371    }
372
373    McpResponse::error(
374        id,
375        McpError {
376            code: McpError::INVALID_PARAMS,
377            message: format!("Resource not found: {}", uri),
378            data: None,
379        },
380    )
381}
382
383fn open_object_schema() -> Value {
384    json!({
385        "type": "object",
386        "additionalProperties": true
387    })
388}
389
390fn get_query_schema(query_id: &str) -> Option<String> {
391    for reg in inventory::iter::<QueryRegistration> {
392        if reg.query_id == query_id {
393            let schema = json!({
394                "$schema": "http://json-schema.org/draft-07/schema#",
395                "title": reg.query_id,
396                "description": format!("Query returning {} entities", reg.query_item_type),
397                "type": "object",
398                "additionalProperties": true,
399            });
400            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
401        }
402    }
403    None
404}
405
406fn get_view_schema(view_id: &str) -> Option<String> {
407    for reg in inventory::iter::<ViewRegistration> {
408        if reg.view_id == view_id {
409            let schema = json!({
410                "$schema": "http://json-schema.org/draft-07/schema#",
411                "title": reg.view_id,
412                "description": format!("View returning a list of {}", reg.view_item_type),
413                "type": "object",
414                "additionalProperties": true,
415            });
416            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
417        }
418    }
419    None
420}
421
422fn get_report_schema(report_id: &str) -> Option<String> {
423    for reg in inventory::iter::<ReportRegistration> {
424        if reg.report_id == report_id {
425            let schema = json!({
426                "$schema": "http://json-schema.org/draft-07/schema#",
427                "title": reg.report_id,
428                "description": format!("Report returning {}", reg.output_type),
429                "type": "object",
430                "additionalProperties": true,
431            });
432            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
433        }
434    }
435    None
436}
437
438fn get_command_schema(command_id: &str) -> Option<String> {
439    for reg in inventory::iter::<CommandRegistration> {
440        if reg.command_id == command_id {
441            let schema = json!({
442                "$schema": "http://json-schema.org/draft-07/schema#",
443                "title": reg.command_id,
444                "description": format!("Command returning {}", reg.result_type),
445                "type": "object",
446                "additionalProperties": true,
447            });
448            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
449        }
450    }
451    None
452}
453
454#[cfg(test)]
455mod tests {
456    use super::*;
457    use serde_json::Value;
458
459    fn make_request(method: &str) -> McpRequest {
460        McpRequest {
461            jsonrpc: "2.0".to_string(),
462            id: Value::Number(1.into()),
463            method: method.to_string(),
464            params: None,
465        }
466    }
467
468    #[tokio::test]
469    async fn initialize_returns_server_info() {
470        let filter = ClientFilters::allow_all();
471        let info = ServerInfo {
472            name: "test".into(),
473            version: "0.0.0".into(),
474        };
475        // Executor is irrelevant for initialize but we need *some* executor;
476        // use an in-process one wrapped around a minimal ctx is heavy here,
477        // so just use a dummy MykoClient.
478        let client = std::sync::Arc::new(myko::client::MykoClient::new());
479        let executor = Executor::Client(client);
480        let response = handle_request(make_request("initialize"), &filter, &executor, &info)
481            .await
482            .expect("initialize must produce a response");
483        let result = response.result.expect("initialize must have a result");
484        assert_eq!(result["serverInfo"]["name"], "test");
485        assert_eq!(result["serverInfo"]["version"], "0.0.0");
486    }
487
488    #[tokio::test]
489    async fn notifications_produce_no_response() {
490        let filter = ClientFilters::allow_all();
491        let info = ServerInfo::default();
492        let client = std::sync::Arc::new(myko::client::MykoClient::new());
493        let executor = Executor::Client(client);
494        assert!(
495            handle_request(
496                make_request("notifications/initialized"),
497                &filter,
498                &executor,
499                &info,
500            )
501            .await
502            .is_none()
503        );
504    }
505
506    #[tokio::test]
507    async fn unknown_method_returns_error() {
508        let filter = ClientFilters::allow_all();
509        let info = ServerInfo::default();
510        let client = std::sync::Arc::new(myko::client::MykoClient::new());
511        let executor = Executor::Client(client);
512        let response = handle_request(make_request("unknown/method"), &filter, &executor, &info)
513            .await
514            .expect("must produce a response");
515        assert!(response.error.is_some());
516    }
517}