Skip to main content

myko_server/mcp/
dispatch.rs

1//! Transport-agnostic MCP JSON-RPC dispatch.
2//!
3//! Handles `initialize`, `tools/list`, `tools/call`, `resources/list`,
4//! `resources/read`, and the relevant notifications.
5//!
6//! ## Tool / resource split
7//!
8//! - **Tools** — every registered query / view / report / command is
9//!   exposed as a tool the LLM can invoke on demand. Tools take structured
10//!   `arguments` matching the registration's input shape.
11//! - **Resources** — every tool also surfaces a *schema* resource at
12//!   `myko://schema/<kind>/<id>` whose content is the JSON Schema for the
13//!   tool's input. The schema goes through the resource shape rather than
14//!   the data because:
15//!   - Resources are URI-keyed and can't carry structured arguments, but
16//!     every query / view / report registration takes args.
17//!   - Even argument-less reads are backed by reactive cells (the data is
18//!     live), so pre-loading a snapshot into context at startup would
19//!     just go stale. On-demand `tools/call` is the right shape for live
20//!     reads.
21//!
22//! Reactive query subscriptions via `resources/subscribe` are future work.
23//!
24//! Error responses follow the [MCP 2025-06-18 error-handling shape][spec]:
25//!
26//! - **Protocol Error** — JSON-RPC error response with `code: -32602` and
27//!   message `"Unknown tool: …"`. Used when a tool is hidden by visibility
28//!   filtering (indistinguishable on the wire from a tool that does not
29//!   exist) or when required `tools/call` params are missing.
30//! - **Tool Execution Error** — successful JSON-RPC response with
31//!   `isError: true` content carrying a descriptive message. Used when
32//!   `tools/call` arguments fail client-supplied argument constraints
33//!   (the spec's "Invalid input data" category) or when tool execution
34//!   raises an error downstream.
35//!
36//! [spec]: https://modelcontextprotocol.io/specification/2025-06-18/server/tools#error-handling
37
38use myko::{
39    command::CommandRegistration, query::QueryRegistration, report::ReportRegistration,
40    view::ViewRegistration,
41};
42use serde_json::{Value, json};
43
44use super::{
45    exec::Executor,
46    filter::ClientFilters,
47    types::{McpError, McpRequest, McpResource, McpResponse, McpTool},
48};
49
50const CONNECTION_STATUS_TOOL: &str = "connection_status";
51
52/// Server identity for the `initialize` response.
53#[derive(Debug, Clone)]
54pub struct ServerInfo {
55    pub name: String,
56    pub version: String,
57    /// Optional `instructions` text returned in the `initialize` response.
58    /// MCP clients surface this to the model on connect; use it to teach
59    /// agents how to use this server.
60    pub instructions: Option<String>,
61}
62
63impl Default for ServerInfo {
64    fn default() -> Self {
65        Self {
66            name: "myko-mcp".to_string(),
67            version: env!("CARGO_PKG_VERSION").to_string(),
68            instructions: None,
69        }
70    }
71}
72
73/// Dispatch one JSON-RPC request. Returns `None` for notifications that do
74/// not produce a response.
75pub async fn handle_request(
76    request: McpRequest,
77    filter: &ClientFilters,
78    executor: &Executor,
79    info: &ServerInfo,
80) -> Option<McpResponse> {
81    match request.method.as_str() {
82        "initialize" => Some(handle_initialize(request.id, info)),
83        "notifications/initialized" | "notifications/cancelled" => None,
84        "tools/list" => Some(handle_tools_list(request.id, filter)),
85        "tools/call" => Some(handle_tools_call(request.id, request.params, filter, executor).await),
86        "resources/list" => Some(handle_resources_list(request.id, filter)),
87        "resources/read" => Some(handle_resources_read(request.id, request.params, filter)),
88        _ => Some(McpResponse::error(
89            request.id,
90            McpError::method_not_found(&request.method),
91        )),
92    }
93}
94
95fn handle_initialize(id: Value, info: &ServerInfo) -> McpResponse {
96    let mut payload = json!({
97        "protocolVersion": "2024-11-05",
98        "capabilities": {
99            "tools": {},
100            "resources": {}
101        },
102        "serverInfo": {
103            "name": info.name,
104            "version": info.version,
105        }
106    });
107    if let Some(text) = &info.instructions {
108        payload
109            .as_object_mut()
110            .expect("payload is an object literal above")
111            .insert("instructions".to_string(), Value::String(text.clone()));
112    }
113    McpResponse::success(id, payload)
114}
115
116fn handle_tools_list(id: Value, filter: &ClientFilters) -> McpResponse {
117    let mut tools: Vec<McpTool> = Vec::new();
118
119    if filter.tool_visible(CONNECTION_STATUS_TOOL) {
120        tools.push(McpTool {
121            name: CONNECTION_STATUS_TOOL.to_string(),
122            description: "Check the connection status to the Myko server".to_string(),
123            input_schema: json!({
124                "type": "object",
125                "properties": {},
126                "required": []
127            }),
128        });
129    }
130
131    for reg in inventory::iter::<QueryRegistration> {
132        let name = format!("query:{}", reg.query_id);
133        if !filter.tool_visible(&name) {
134            continue;
135        }
136        tools.push(McpTool {
137            name,
138            description: format!("Query returning {} entities", reg.query_item_type),
139            input_schema: open_object_schema(),
140        });
141    }
142
143    for reg in inventory::iter::<ViewRegistration> {
144        let name = format!("view:{}", reg.view_id);
145        if !filter.tool_visible(&name) {
146            continue;
147        }
148        tools.push(McpTool {
149            name,
150            description: format!("View returning a list of {}", reg.view_item_type),
151            input_schema: open_object_schema(),
152        });
153    }
154
155    for reg in inventory::iter::<ReportRegistration> {
156        let name = format!("report:{}", reg.report_id);
157        if !filter.tool_visible(&name) {
158            continue;
159        }
160        tools.push(McpTool {
161            name,
162            description: format!("Report returning {}", reg.output_type),
163            input_schema: open_object_schema(),
164        });
165    }
166
167    for reg in inventory::iter::<CommandRegistration> {
168        let name = format!("command:{}", reg.command_id);
169        if !filter.tool_visible(&name) {
170            continue;
171        }
172        tools.push(McpTool {
173            name,
174            description: format!("Command returning {}", reg.result_type),
175            input_schema: open_object_schema(),
176        });
177    }
178
179    McpResponse::success(id, json!({ "tools": tools }))
180}
181
182async fn handle_tools_call(
183    id: Value,
184    params: Option<Value>,
185    filter: &ClientFilters,
186    executor: &Executor,
187) -> McpResponse {
188    let Some(params) = params else {
189        return McpResponse::error(id, McpError::invalid_params("Missing params"));
190    };
191    let Some(tool_name) = params
192        .get("name")
193        .and_then(|v| v.as_str())
194        .map(str::to_string)
195    else {
196        return McpResponse::error(id, McpError::invalid_params("Missing tool name"));
197    };
198
199    // MCP Protocol Error: a hidden tool is indistinguishable on the wire from
200    // a tool that doesn't exist. Code -32602 + "Unknown tool: …" matches the
201    // example in the MCP 2025-06-18 spec (Tools / Error Handling).
202    if !filter.tool_visible(&tool_name) {
203        return McpResponse::error(
204            id,
205            McpError {
206                code: McpError::INVALID_PARAMS,
207                message: format!("Unknown tool: {}", tool_name),
208                data: None,
209            },
210        );
211    }
212
213    let arguments = params
214        .get("arguments")
215        .cloned()
216        .unwrap_or_else(|| json!({}));
217
218    // MCP Tool Execution Error ("Invalid input data" category): result is a
219    // successful JSON-RPC response carrying `isError: true` content with the
220    // descriptive constraint message verbatim — distinct from the protocol
221    // error path above.
222    if let Err(message) = filter.tool_callable(&tool_name, &arguments) {
223        return McpResponse::success(
224            id,
225            json!({
226                "content": [{
227                    "type": "text",
228                    "text": message,
229                }],
230                "isError": true,
231            }),
232        );
233    }
234
235    let result = execute_tool(executor, &tool_name, arguments).await;
236
237    match result {
238        Ok(data) => McpResponse::success(
239            id,
240            json!({
241                "content": [{
242                    "type": "text",
243                    "text": serde_json::to_string_pretty(&data).unwrap_or_default()
244                }]
245            }),
246        ),
247        Err(message) => McpResponse::success(
248            id,
249            json!({
250                "content": [{
251                    "type": "text",
252                    "text": format!("Error: {}", message)
253                }],
254                "isError": true,
255            }),
256        ),
257    }
258}
259
260async fn execute_tool(executor: &Executor, tool_name: &str, args: Value) -> Result<Value, String> {
261    if tool_name == CONNECTION_STATUS_TOOL {
262        return Ok(executor.connection_status());
263    }
264    if let Some(id) = tool_name.strip_prefix("query:") {
265        return executor.execute_query(id, args).await;
266    }
267    if let Some(id) = tool_name.strip_prefix("view:") {
268        return executor.execute_view(id, args).await;
269    }
270    if let Some(id) = tool_name.strip_prefix("report:") {
271        return executor.execute_report(id, args).await;
272    }
273    if let Some(id) = tool_name.strip_prefix("command:") {
274        return executor.execute_command(id, args).await;
275    }
276    Err(format!("Unknown tool: {}", tool_name))
277}
278
279fn handle_resources_list(id: Value, filter: &ClientFilters) -> McpResponse {
280    let mut resources: Vec<McpResource> = Vec::new();
281
282    for reg in inventory::iter::<QueryRegistration> {
283        let tool_name = format!("query:{}", reg.query_id);
284        if !filter.tool_visible(&tool_name) {
285            continue;
286        }
287        resources.push(McpResource {
288            uri: format!("myko://schema/query/{}", reg.query_id),
289            name: reg.query_id.to_string(),
290            description: Some(format!("Query returning {} entities", reg.query_item_type)),
291            mime_type: Some("application/json".to_string()),
292        });
293    }
294
295    for reg in inventory::iter::<ViewRegistration> {
296        let tool_name = format!("view:{}", reg.view_id);
297        if !filter.tool_visible(&tool_name) {
298            continue;
299        }
300        resources.push(McpResource {
301            uri: format!("myko://schema/view/{}", reg.view_id),
302            name: reg.view_id.to_string(),
303            description: Some(format!("View returning a list of {}", reg.view_item_type)),
304            mime_type: Some("application/json".to_string()),
305        });
306    }
307
308    for reg in inventory::iter::<ReportRegistration> {
309        let tool_name = format!("report:{}", reg.report_id);
310        if !filter.tool_visible(&tool_name) {
311            continue;
312        }
313        resources.push(McpResource {
314            uri: format!("myko://schema/report/{}", reg.report_id),
315            name: reg.report_id.to_string(),
316            description: Some(format!("Report returning {}", reg.output_type)),
317            mime_type: Some("application/json".to_string()),
318        });
319    }
320
321    for reg in inventory::iter::<CommandRegistration> {
322        let tool_name = format!("command:{}", reg.command_id);
323        if !filter.tool_visible(&tool_name) {
324            continue;
325        }
326        resources.push(McpResource {
327            uri: format!("myko://schema/command/{}", reg.command_id),
328            name: format!("{} (command)", reg.command_id),
329            description: Some(format!("Command returning {}", reg.result_type)),
330            mime_type: Some("application/json".to_string()),
331        });
332    }
333
334    McpResponse::success(id, json!({ "resources": resources }))
335}
336
337fn handle_resources_read(id: Value, params: Option<Value>, filter: &ClientFilters) -> McpResponse {
338    let Some(params) = params else {
339        return McpResponse::error(id, McpError::invalid_params("Missing params"));
340    };
341    let Some(uri) = params.get("uri").and_then(|v| v.as_str()) else {
342        return McpResponse::error(id, McpError::invalid_params("Missing uri"));
343    };
344
345    if let Some(path) = uri.strip_prefix("myko://schema/") {
346        let parts: Vec<&str> = path.splitn(2, '/').collect();
347        if parts.len() == 2 {
348            let (schema_type, schema_id) = (parts[0], parts[1]);
349            let tool_name = format!("{}:{}", schema_type, schema_id);
350            if !filter.tool_visible(&tool_name) {
351                return McpResponse::error(
352                    id,
353                    McpError {
354                        code: McpError::INVALID_PARAMS,
355                        message: format!("Resource not accessible: {}", uri),
356                        data: None,
357                    },
358                );
359            }
360            let content = match schema_type {
361                "query" => get_query_schema(schema_id),
362                "view" => get_view_schema(schema_id),
363                "report" => get_report_schema(schema_id),
364                "command" => get_command_schema(schema_id),
365                _ => None,
366            };
367            if let Some(content) = content {
368                return McpResponse::success(
369                    id,
370                    json!({
371                        "contents": [{
372                            "uri": uri,
373                            "mimeType": "application/json",
374                            "text": content,
375                        }]
376                    }),
377                );
378            }
379        }
380    }
381
382    McpResponse::error(
383        id,
384        McpError {
385            code: McpError::INVALID_PARAMS,
386            message: format!("Resource not found: {}", uri),
387            data: None,
388        },
389    )
390}
391
392fn open_object_schema() -> Value {
393    json!({
394        "type": "object",
395        "additionalProperties": true
396    })
397}
398
399fn get_query_schema(query_id: &str) -> Option<String> {
400    for reg in inventory::iter::<QueryRegistration> {
401        if reg.query_id == query_id {
402            let schema = json!({
403                "$schema": "http://json-schema.org/draft-07/schema#",
404                "title": reg.query_id,
405                "description": format!("Query returning {} entities", reg.query_item_type),
406                "type": "object",
407                "additionalProperties": true,
408            });
409            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
410        }
411    }
412    None
413}
414
415fn get_view_schema(view_id: &str) -> Option<String> {
416    for reg in inventory::iter::<ViewRegistration> {
417        if reg.view_id == view_id {
418            let schema = json!({
419                "$schema": "http://json-schema.org/draft-07/schema#",
420                "title": reg.view_id,
421                "description": format!("View returning a list of {}", reg.view_item_type),
422                "type": "object",
423                "additionalProperties": true,
424            });
425            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
426        }
427    }
428    None
429}
430
431fn get_report_schema(report_id: &str) -> Option<String> {
432    for reg in inventory::iter::<ReportRegistration> {
433        if reg.report_id == report_id {
434            let schema = json!({
435                "$schema": "http://json-schema.org/draft-07/schema#",
436                "title": reg.report_id,
437                "description": format!("Report returning {}", reg.output_type),
438                "type": "object",
439                "additionalProperties": true,
440            });
441            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
442        }
443    }
444    None
445}
446
447fn get_command_schema(command_id: &str) -> Option<String> {
448    for reg in inventory::iter::<CommandRegistration> {
449        if reg.command_id == command_id {
450            let schema = json!({
451                "$schema": "http://json-schema.org/draft-07/schema#",
452                "title": reg.command_id,
453                "description": format!("Command returning {}", reg.result_type),
454                "type": "object",
455                "additionalProperties": true,
456            });
457            return Some(serde_json::to_string_pretty(&schema).unwrap_or_default());
458        }
459    }
460    None
461}
462
463#[cfg(test)]
464mod tests {
465    use super::*;
466    use serde_json::Value;
467
468    fn make_request(method: &str) -> McpRequest {
469        McpRequest {
470            jsonrpc: "2.0".to_string(),
471            id: Value::Number(1.into()),
472            method: method.to_string(),
473            params: None,
474        }
475    }
476
477    #[test]
478    fn server_info_default_omits_instructions() {
479        let info = ServerInfo::default();
480        assert_eq!(info.instructions, None);
481    }
482
483    #[test]
484    fn server_info_can_carry_instructions() {
485        let info = ServerInfo {
486            name: "test".into(),
487            version: "0.0.0".into(),
488            instructions: Some("test instructions text".into()),
489        };
490        assert_eq!(info.instructions.as_deref(), Some("test instructions text"));
491    }
492
493    #[tokio::test]
494    async fn initialize_returns_server_info() {
495        let filter = ClientFilters::allow_all();
496        let info = ServerInfo {
497            name: "test".into(),
498            version: "0.0.0".into(),
499            instructions: None,
500        };
501        // Executor is irrelevant for initialize but we need *some* executor;
502        // use an in-process one wrapped around a minimal ctx is heavy here,
503        // so just use a dummy MykoClient.
504        let client = std::sync::Arc::new(myko::client::MykoClient::new());
505        let executor = Executor::Client(client);
506        let response = handle_request(make_request("initialize"), &filter, &executor, &info)
507            .await
508            .expect("initialize must produce a response");
509        let result = response.result.expect("initialize must have a result");
510        assert_eq!(result["serverInfo"]["name"], "test");
511        assert_eq!(result["serverInfo"]["version"], "0.0.0");
512    }
513
514    #[tokio::test]
515    async fn initialize_includes_instructions_when_set() {
516        let filter = ClientFilters::allow_all();
517        let info = ServerInfo {
518            name: "pulse-mcp".into(),
519            version: "0.2.0".into(),
520            instructions: Some("teach me".into()),
521        };
522        let client = std::sync::Arc::new(myko::client::MykoClient::new());
523        let executor = Executor::Client(client);
524
525        let resp = handle_request(make_request("initialize"), &filter, &executor, &info)
526            .await
527            .expect("initialize must return a response");
528        let result = resp.result.expect("initialize must succeed");
529
530        assert_eq!(result["serverInfo"]["name"], json!("pulse-mcp"));
531        assert_eq!(result["serverInfo"]["version"], json!("0.2.0"));
532        assert_eq!(result["instructions"], json!("teach me"));
533    }
534
535    #[tokio::test]
536    async fn initialize_omits_instructions_when_unset() {
537        let filter = ClientFilters::allow_all();
538        let info = ServerInfo::default();
539        let client = std::sync::Arc::new(myko::client::MykoClient::new());
540        let executor = Executor::Client(client);
541
542        let resp = handle_request(make_request("initialize"), &filter, &executor, &info)
543            .await
544            .expect("response");
545        let result = resp.result.expect("ok");
546        assert!(
547            result.get("instructions").is_none(),
548            "instructions must be omitted when ServerInfo.instructions is None"
549        );
550    }
551
552    #[tokio::test]
553    async fn notifications_produce_no_response() {
554        let filter = ClientFilters::allow_all();
555        let info = ServerInfo::default();
556        let client = std::sync::Arc::new(myko::client::MykoClient::new());
557        let executor = Executor::Client(client);
558        assert!(
559            handle_request(
560                make_request("notifications/initialized"),
561                &filter,
562                &executor,
563                &info,
564            )
565            .await
566            .is_none()
567        );
568    }
569
570    #[tokio::test]
571    async fn unknown_method_returns_error() {
572        let filter = ClientFilters::allow_all();
573        let info = ServerInfo::default();
574        let client = std::sync::Arc::new(myko::client::MykoClient::new());
575        let executor = Executor::Client(client);
576        let response = handle_request(make_request("unknown/method"), &filter, &executor, &info)
577            .await
578            .expect("must produce a response");
579        assert!(response.error.is_some());
580    }
581}