Skip to main content

myko_server/mcp/
exec.rs

1//! Tool executor abstraction for MCP transports.
2//!
3//! Two execution paths share the same dispatch core:
4//!
5//! - [`Executor::Client`] — wraps a [`MykoClient`]; talks to a remote Myko server
6//!   over WebSocket. Used by the stdio MCP binary.
7//! - [`Executor::InProcess`] — talks directly to a [`CellServerCtx`]; used by the
8//!   HTTP/WS MCP endpoints hosted inside the server.
9
10use std::{
11    sync::{Arc, Mutex},
12    time::Duration,
13};
14
15use hyphae::{Gettable, Watchable};
16use myko::{
17    client::{ConnectionStatus, MykoClient},
18    command::{CommandContext, CommandHandlerRegistration},
19    query::QueryRegistration,
20    report::ReportRegistration,
21    request::RequestContext,
22    server::CellServerCtx,
23    view::ViewRegistration,
24    wire::{WrappedCommand, WrappedQuery, WrappedReport, WrappedView},
25};
26use serde_json::{Value, json};
27use tokio::sync::oneshot;
28use uuid::Uuid;
29
30const QUERY_TIMEOUT: Duration = Duration::from_secs(5);
31const REPORT_TIMEOUT: Duration = Duration::from_secs(5);
32const COMMAND_TIMEOUT: Duration = Duration::from_secs(10);
33const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
34
35/// How MCP dispatch reaches the underlying Myko queries / reports / commands.
36pub enum Executor {
37    /// Talk to a remote Myko server over WebSocket via a `MykoClient`.
38    Client(Arc<MykoClient>),
39    /// Talk to a server hosted in the same process via its `CellServerCtx`.
40    InProcess(Arc<CellServerCtx>),
41}
42
43impl Executor {
44    /// Execute a query and return its current items as JSON.
45    pub async fn execute_query(&self, query_id: &str, args: Value) -> Result<Value, String> {
46        match self {
47            Executor::Client(client) => client_execute_query(client.clone(), query_id, args).await,
48            Executor::InProcess(ctx) => in_process_execute_query(ctx.clone(), query_id, args),
49        }
50    }
51
52    /// Execute a report and return its current output as JSON.
53    pub async fn execute_report(&self, report_id: &str, args: Value) -> Result<Value, String> {
54        match self {
55            Executor::Client(client) => {
56                client_execute_report(client.clone(), report_id, args).await
57            }
58            Executor::InProcess(ctx) => {
59                in_process_execute_report(ctx.clone(), report_id, args).await
60            }
61        }
62    }
63
64    /// Execute a view (list-typed report) and return its current items as JSON.
65    pub async fn execute_view(&self, view_id: &str, args: Value) -> Result<Value, String> {
66        match self {
67            Executor::Client(client) => client_execute_view(client.clone(), view_id, args).await,
68            Executor::InProcess(ctx) => in_process_execute_view(ctx.clone(), view_id, args),
69        }
70    }
71
72    /// Execute a command and return its result as JSON.
73    pub async fn execute_command(&self, command_id: &str, args: Value) -> Result<Value, String> {
74        match self {
75            Executor::Client(client) => {
76                client_execute_command(client.clone(), command_id, args).await
77            }
78            Executor::InProcess(ctx) => in_process_execute_command(ctx.clone(), command_id, args),
79        }
80    }
81
82    /// Status string for the built-in `connection_status` tool.
83    pub fn connection_status(&self) -> Value {
84        match self {
85            Executor::Client(client) => {
86                let status = client.connection_status().get();
87                let text = match &status {
88                    ConnectionStatus::Connected(addr) => format!("Connected to {}", addr),
89                    ConnectionStatus::Connecting(addr) => format!("Connecting to {}", addr),
90                    ConnectionStatus::Reconnecting(addr) => format!("Reconnecting to {}", addr),
91                    ConnectionStatus::Idle => "Idle".to_string(),
92                    ConnectionStatus::Disconnected => "Disconnected".to_string(),
93                };
94                json!({ "status": text })
95            }
96            Executor::InProcess(_) => json!({ "status": "In-process (always connected)" }),
97        }
98    }
99}
100
101// ─────────────────────────────────────────────────────────────────────────────
102// Client-mode execution (stdio MCP path)
103// ─────────────────────────────────────────────────────────────────────────────
104
105async fn client_execute_query(
106    client: Arc<MykoClient>,
107    query_id: &str,
108    arguments: Value,
109) -> Result<Value, String> {
110    for reg in inventory::iter::<QueryRegistration> {
111        if reg.query_id == query_id {
112            let tx = Uuid::new_v4().to_string();
113            let mut query_json = arguments_object(arguments);
114            if let Some(obj) = query_json.as_object_mut() {
115                obj.insert("tx".to_string(), json!(tx));
116                obj.insert(
117                    "createdAt".to_string(),
118                    json!(chrono::Utc::now().to_rfc3339()),
119                );
120            }
121
122            let wrapped = WrappedQuery {
123                query: query_json,
124                query_id: reg.query_id.into(),
125                query_item_type: reg.query_item_type.into(),
126                window: None,
127            };
128
129            let cell = client.watch_query_raw(wrapped);
130            let (result_tx, result_rx) = oneshot::channel::<Vec<Value>>();
131            let result_tx = Arc::new(Mutex::new(Some(result_tx)));
132            let seen_initial = Arc::new(Mutex::new(false));
133            let result_tx_sub = result_tx.clone();
134            let seen_initial_sub = seen_initial.clone();
135            let _guard = cell.subscribe(move |signal| {
136                if let hyphae::Signal::Value(items) = signal {
137                    let mut seen = seen_initial_sub.lock().unwrap();
138                    if !*seen {
139                        *seen = true;
140                        return;
141                    }
142                    if let Some(tx) = result_tx_sub.lock().unwrap().take() {
143                        let _ = tx.send((**items).clone());
144                    }
145                }
146            });
147
148            return match tokio::time::timeout(QUERY_TIMEOUT, result_rx).await {
149                Ok(Ok(items)) => Ok(json!({
150                    "query_id": query_id,
151                    "item_type": reg.query_item_type,
152                    "count": items.len(),
153                    "items": items,
154                })),
155                Ok(Err(_)) => Err("Query channel closed".to_string()),
156                Err(_) => Err("Timeout waiting for query response".to_string()),
157            };
158        }
159    }
160    Err(format!("Query not found: {}", query_id))
161}
162
163async fn client_execute_view(
164    client: Arc<MykoClient>,
165    view_id: &str,
166    arguments: Value,
167) -> Result<Value, String> {
168    for reg in inventory::iter::<ViewRegistration> {
169        if reg.view_id == view_id {
170            let tx = Uuid::new_v4().to_string();
171            let mut view_json = arguments_object(arguments);
172            if let Some(obj) = view_json.as_object_mut() {
173                obj.insert("tx".to_string(), json!(tx));
174                obj.insert(
175                    "createdAt".to_string(),
176                    json!(chrono::Utc::now().to_rfc3339()),
177                );
178            }
179
180            let wrapped = WrappedView {
181                view: view_json,
182                view_id: reg.view_id.into(),
183                view_item_type: reg.view_item_type.into(),
184                window: None,
185            };
186
187            let cell = client.watch_view_raw(wrapped);
188            let (result_tx, result_rx) = oneshot::channel::<Vec<Value>>();
189            let result_tx = Arc::new(Mutex::new(Some(result_tx)));
190            let seen_initial = Arc::new(Mutex::new(false));
191            let result_tx_sub = result_tx.clone();
192            let seen_initial_sub = seen_initial.clone();
193            let _guard = cell.subscribe(move |signal| {
194                if let hyphae::Signal::Value(items) = signal {
195                    let mut seen = seen_initial_sub.lock().unwrap();
196                    if !*seen {
197                        *seen = true;
198                        return;
199                    }
200                    if let Some(tx) = result_tx_sub.lock().unwrap().take() {
201                        let _ = tx.send((**items).clone());
202                    }
203                }
204            });
205
206            return match tokio::time::timeout(QUERY_TIMEOUT, result_rx).await {
207                Ok(Ok(items)) => Ok(json!({
208                    "view_id": view_id,
209                    "item_type": reg.view_item_type,
210                    "count": items.len(),
211                    "items": items,
212                })),
213                Ok(Err(_)) => Err("View channel closed".to_string()),
214                Err(_) => Err("Timeout waiting for view response".to_string()),
215            };
216        }
217    }
218    Err(format!("View not found: {}", view_id))
219}
220
221async fn client_execute_report(
222    client: Arc<MykoClient>,
223    report_id: &str,
224    arguments: Value,
225) -> Result<Value, String> {
226    for reg in inventory::iter::<ReportRegistration> {
227        if reg.report_id == report_id {
228            let tx = Uuid::new_v4().to_string();
229            let mut report_json = arguments_object(arguments);
230            if let Some(obj) = report_json.as_object_mut() {
231                obj.insert("tx".to_string(), json!(tx));
232            }
233
234            let wrapped = WrappedReport {
235                report: report_json,
236                report_id: reg.report_id.to_string(),
237            };
238
239            let cell = client.watch_report_raw(wrapped);
240            let (result_tx, result_rx) = oneshot::channel::<Value>();
241            let result_tx = Arc::new(Mutex::new(Some(result_tx)));
242            let _guard = cell.subscribe(move |signal| {
243                if let hyphae::Signal::Value(value_opt) = signal
244                    && let Some(value) = &**value_opt
245                    && let Some(tx) = result_tx.lock().unwrap().take()
246                {
247                    let _ = tx.send(value.clone());
248                }
249            });
250
251            return match tokio::time::timeout(REPORT_TIMEOUT, result_rx).await {
252                Ok(Ok(value)) => Ok(json!({
253                    "report_id": report_id,
254                    "output_type": reg.output_type,
255                    "result": value,
256                })),
257                Ok(Err(_)) => Err("Report channel closed".to_string()),
258                Err(_) => Err("Timeout waiting for report response".to_string()),
259            };
260        }
261    }
262    Err(format!("Report not found: {}", report_id))
263}
264
265async fn client_execute_command(
266    client: Arc<MykoClient>,
267    command_id: &str,
268    arguments: Value,
269) -> Result<Value, String> {
270    let status = client.connection_status().get();
271    if !matches!(status, ConnectionStatus::Connected(_)) {
272        let (tx_connected, rx_connected) = oneshot::channel::<bool>();
273        let tx_connected = Mutex::new(Some(tx_connected));
274        let guard = client.connection_status().subscribe(move |signal| {
275            if let hyphae::Signal::Value(status) = signal
276                && let ConnectionStatus::Connected(_) = &**status
277                && let Some(sender) = tx_connected.lock().unwrap().take()
278            {
279                let _ = sender.send(true);
280            }
281        });
282
283        let connected = tokio::time::timeout(CONNECT_TIMEOUT, rx_connected)
284            .await
285            .unwrap_or(Ok(false))
286            .unwrap_or(false);
287        drop(guard);
288
289        if !connected {
290            return Err("Not connected to Myko server".to_string());
291        }
292    }
293
294    let tx = Uuid::new_v4().to_string();
295    let mut command_json = arguments_object(arguments);
296    if let Some(obj) = command_json.as_object_mut() {
297        obj.insert("tx".to_string(), json!(tx));
298    }
299
300    let wrapped = WrappedCommand {
301        command: command_json,
302        command_id: command_id.to_string(),
303    };
304
305    let result_cell = client.send_command_raw_result(wrapped);
306    let (resp_tx, resp_rx) = oneshot::channel::<Result<Value, String>>();
307    let resp_tx = Arc::new(Mutex::new(Some(resp_tx)));
308    let _guard = result_cell.subscribe(move |signal| {
309        if let hyphae::Signal::Value(result_opt) = signal
310            && let Some(result) = &**result_opt
311            && let Some(sender) = resp_tx.lock().unwrap().take()
312        {
313            let _ = sender.send(result.clone());
314        }
315    });
316
317    match tokio::time::timeout(COMMAND_TIMEOUT, resp_rx).await {
318        Ok(Ok(Ok(response))) => Ok(json!({
319            "command_id": command_id,
320            "success": true,
321            "result": response,
322        })),
323        Ok(Ok(Err(e))) => Err(e),
324        _ => Err("Timeout waiting for response".to_string()),
325    }
326}
327
328// ─────────────────────────────────────────────────────────────────────────────
329// In-process execution (HTTP/WS MCP path)
330// ─────────────────────────────────────────────────────────────────────────────
331
332fn in_process_execute_query(
333    ctx: Arc<CellServerCtx>,
334    query_id: &str,
335    arguments: Value,
336) -> Result<Value, String> {
337    let registration = inventory::iter::<QueryRegistration>
338        .into_iter()
339        .find(|r| r.query_id == query_id)
340        .ok_or_else(|| format!("Query not found: {}", query_id))?;
341
342    let query_data = ctx
343        .handler_registry
344        .get_query(query_id)
345        .ok_or_else(|| format!("Query handler not registered: {}", query_id))?;
346
347    let mut query_json = arguments_object(arguments);
348    let tx: Arc<str> = Uuid::new_v4().to_string().into();
349    if let Some(obj) = query_json.as_object_mut() {
350        obj.insert("tx".to_string(), json!(tx.as_ref()));
351        obj.insert(
352            "createdAt".to_string(),
353            json!(chrono::Utc::now().to_rfc3339()),
354        );
355    }
356
357    let parsed = (query_data.parse)(query_json)
358        .map_err(|e| format!("Failed to parse query {}: {}", query_id, e))?;
359
360    let request_context = Arc::new(RequestContext::internal(tx, ctx.host_id, "mcp"));
361
362    let cellmap = (query_data.cell_factory)(
363        parsed,
364        ctx.registry.clone(),
365        request_context,
366        Some(ctx.clone()),
367    )
368    .map_err(|e| format!("Failed to build query cell: {}", e))?;
369
370    let items: Vec<Value> = cellmap
371        .snapshot()
372        .into_iter()
373        .map(|(_, item)| serde_json::to_value(&*item).unwrap_or(Value::Null))
374        .collect();
375
376    Ok(json!({
377        "query_id": query_id,
378        "item_type": registration.query_item_type,
379        "count": items.len(),
380        "items": items,
381    }))
382}
383
384fn in_process_execute_view(
385    ctx: Arc<CellServerCtx>,
386    view_id: &str,
387    arguments: Value,
388) -> Result<Value, String> {
389    let registration = inventory::iter::<ViewRegistration>
390        .into_iter()
391        .find(|r| r.view_id == view_id)
392        .ok_or_else(|| format!("View not found: {}", view_id))?;
393
394    let view_data = ctx
395        .handler_registry
396        .get_view(view_id)
397        .ok_or_else(|| format!("View handler not registered: {}", view_id))?;
398
399    let mut view_json = arguments_object(arguments);
400    let tx: Arc<str> = Uuid::new_v4().to_string().into();
401    if let Some(obj) = view_json.as_object_mut() {
402        obj.insert("tx".to_string(), json!(tx.as_ref()));
403        obj.insert(
404            "createdAt".to_string(),
405            json!(chrono::Utc::now().to_rfc3339()),
406        );
407    }
408
409    let parsed = (view_data.parse)(view_json)
410        .map_err(|e| format!("Failed to parse view {}: {}", view_id, e))?;
411
412    let request_context = Arc::new(RequestContext::internal(tx, ctx.host_id, "mcp"));
413
414    let cellmap =
415        (view_data.cell_factory)(parsed, ctx.registry.clone(), request_context, ctx.clone())
416            .map_err(|e| format!("Failed to build view cell: {}", e))?;
417
418    let items: Vec<Value> = cellmap
419        .snapshot()
420        .into_iter()
421        .map(|(_, item)| serde_json::to_value(&*item).unwrap_or(Value::Null))
422        .collect();
423
424    Ok(json!({
425        "view_id": view_id,
426        "item_type": registration.view_item_type,
427        "count": items.len(),
428        "items": items,
429    }))
430}
431
432async fn in_process_execute_report(
433    ctx: Arc<CellServerCtx>,
434    report_id: &str,
435    arguments: Value,
436) -> Result<Value, String> {
437    let registration = inventory::iter::<ReportRegistration>
438        .into_iter()
439        .find(|r| r.report_id == report_id)
440        .ok_or_else(|| format!("Report not found: {}", report_id))?;
441
442    let report_data = ctx
443        .handler_registry
444        .get_report(report_id)
445        .ok_or_else(|| format!("Report handler not registered: {}", report_id))?;
446
447    let mut report_json = arguments_object(arguments);
448    let tx: Arc<str> = Uuid::new_v4().to_string().into();
449    if let Some(obj) = report_json.as_object_mut() {
450        obj.insert("tx".to_string(), json!(tx.as_ref()));
451    }
452
453    let parsed = (report_data.parse)(report_json)
454        .map_err(|e| format!("Failed to parse report {}: {}", report_id, e))?;
455
456    let request_context = Arc::new(RequestContext::internal(tx, ctx.host_id, "mcp"));
457
458    let cell = (report_data.cell_factory)(parsed, request_context, ctx)
459        .map_err(|e| format!("Failed to build report cell: {}", e))?;
460
461    // Subscribe to drive reactive evaluation; capture the first emission.
462    let (tx_resp, rx_resp) = oneshot::channel::<Value>();
463    let tx_resp = Arc::new(Mutex::new(Some(tx_resp)));
464    let tx_resp_sub = tx_resp.clone();
465    let _guard = cell.subscribe(move |signal| {
466        if let hyphae::Signal::Value(output) = signal
467            && let Some(sender) = tx_resp_sub.lock().unwrap().take()
468        {
469            let _ = sender.send(output.to_value());
470        }
471    });
472
473    match tokio::time::timeout(REPORT_TIMEOUT, rx_resp).await {
474        Ok(Ok(value)) => Ok(json!({
475            "report_id": report_id,
476            "output_type": registration.output_type,
477            "result": value,
478        })),
479        Ok(Err(_)) => Err("Report cell dropped before emitting".to_string()),
480        Err(_) => Err("Timeout waiting for report value".to_string()),
481    }
482}
483
484fn in_process_execute_command(
485    ctx: Arc<CellServerCtx>,
486    command_id: &str,
487    arguments: Value,
488) -> Result<Value, String> {
489    let mut command_json = arguments_object(arguments);
490    let tx: Arc<str> = Uuid::new_v4().to_string().into();
491    if let Some(obj) = command_json.as_object_mut() {
492        obj.insert("tx".to_string(), json!(tx.as_ref()));
493    }
494
495    for registration in inventory::iter::<CommandHandlerRegistration> {
496        if registration.command_id == command_id {
497            let executor = (registration.factory)();
498            let req = Arc::new(RequestContext::internal(tx.clone(), ctx.host_id, "mcp"));
499            let cmd_id: Arc<str> = Arc::from(command_id);
500            let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
501
502            return match executor.execute_from_value(command_json, cmd_ctx) {
503                Ok(result) => Ok(json!({
504                    "command_id": command_id,
505                    "success": true,
506                    "result": result,
507                })),
508                Err(err) => Err(err.message),
509            };
510        }
511    }
512
513    Err(format!("Command handler not found: {}", command_id))
514}
515
516// ─────────────────────────────────────────────────────────────────────────────
517// Helpers
518// ─────────────────────────────────────────────────────────────────────────────
519
520fn arguments_object(arguments: Value) -> Value {
521    if arguments.is_object() {
522        arguments
523    } else {
524        json!({})
525    }
526}