1use std::{
11 sync::{Arc, Mutex},
12 time::Duration,
13};
14
15use hyphae::{Gettable, Watchable};
16use myko::{
17 client::{ConnectionStatus, MykoClient},
18 command::{CommandContext, CommandHandlerRegistration},
19 query::QueryRegistration,
20 report::ReportRegistration,
21 request::RequestContext,
22 server::CellServerCtx,
23 view::ViewRegistration,
24 wire::{WrappedCommand, WrappedQuery, WrappedReport, WrappedView},
25};
26use serde_json::{Value, json};
27use tokio::sync::oneshot;
28use uuid::Uuid;
29
30const QUERY_TIMEOUT: Duration = Duration::from_secs(5);
31const REPORT_TIMEOUT: Duration = Duration::from_secs(5);
32const COMMAND_TIMEOUT: Duration = Duration::from_secs(10);
33const CONNECT_TIMEOUT: Duration = Duration::from_secs(5);
34
35pub enum Executor {
37 Client(Arc<MykoClient>),
39 InProcess(Arc<CellServerCtx>),
41}
42
43impl Executor {
44 pub async fn execute_query(&self, query_id: &str, args: Value) -> Result<Value, String> {
46 match self {
47 Executor::Client(client) => client_execute_query(client.clone(), query_id, args).await,
48 Executor::InProcess(ctx) => in_process_execute_query(ctx.clone(), query_id, args),
49 }
50 }
51
52 pub async fn execute_report(&self, report_id: &str, args: Value) -> Result<Value, String> {
54 match self {
55 Executor::Client(client) => {
56 client_execute_report(client.clone(), report_id, args).await
57 }
58 Executor::InProcess(ctx) => {
59 in_process_execute_report(ctx.clone(), report_id, args).await
60 }
61 }
62 }
63
64 pub async fn execute_view(&self, view_id: &str, args: Value) -> Result<Value, String> {
66 match self {
67 Executor::Client(client) => client_execute_view(client.clone(), view_id, args).await,
68 Executor::InProcess(ctx) => in_process_execute_view(ctx.clone(), view_id, args),
69 }
70 }
71
72 pub async fn execute_command(&self, command_id: &str, args: Value) -> Result<Value, String> {
74 match self {
75 Executor::Client(client) => {
76 client_execute_command(client.clone(), command_id, args).await
77 }
78 Executor::InProcess(ctx) => in_process_execute_command(ctx.clone(), command_id, args),
79 }
80 }
81
82 pub fn connection_status(&self) -> Value {
84 match self {
85 Executor::Client(client) => {
86 let status = client.connection_status().get();
87 let text = match &status {
88 ConnectionStatus::Connected(addr) => format!("Connected to {}", addr),
89 ConnectionStatus::Connecting(addr) => format!("Connecting to {}", addr),
90 ConnectionStatus::Reconnecting(addr) => format!("Reconnecting to {}", addr),
91 ConnectionStatus::Idle => "Idle".to_string(),
92 ConnectionStatus::Disconnected => "Disconnected".to_string(),
93 };
94 json!({ "status": text })
95 }
96 Executor::InProcess(_) => json!({ "status": "In-process (always connected)" }),
97 }
98 }
99}
100
101async fn client_execute_query(
106 client: Arc<MykoClient>,
107 query_id: &str,
108 arguments: Value,
109) -> Result<Value, String> {
110 for reg in inventory::iter::<QueryRegistration> {
111 if reg.query_id == query_id {
112 let tx = Uuid::new_v4().to_string();
113 let mut query_json = arguments_object(arguments);
114 if let Some(obj) = query_json.as_object_mut() {
115 obj.insert("tx".to_string(), json!(tx));
116 obj.insert(
117 "createdAt".to_string(),
118 json!(chrono::Utc::now().to_rfc3339()),
119 );
120 }
121
122 let wrapped = WrappedQuery {
123 query: query_json,
124 query_id: reg.query_id.into(),
125 query_item_type: reg.query_item_type.into(),
126 window: None,
127 };
128
129 let cell = client.watch_query_raw(wrapped);
130 let (result_tx, result_rx) = oneshot::channel::<Vec<Value>>();
131 let result_tx = Arc::new(Mutex::new(Some(result_tx)));
132 let seen_initial = Arc::new(Mutex::new(false));
133 let result_tx_sub = result_tx.clone();
134 let seen_initial_sub = seen_initial.clone();
135 let _guard = cell.subscribe(move |signal| {
136 if let hyphae::Signal::Value(items) = signal {
137 let mut seen = seen_initial_sub.lock().unwrap();
138 if !*seen {
139 *seen = true;
140 return;
141 }
142 if let Some(tx) = result_tx_sub.lock().unwrap().take() {
143 let _ = tx.send((**items).clone());
144 }
145 }
146 });
147
148 return match tokio::time::timeout(QUERY_TIMEOUT, result_rx).await {
149 Ok(Ok(items)) => Ok(json!({
150 "query_id": query_id,
151 "item_type": reg.query_item_type,
152 "count": items.len(),
153 "items": items,
154 })),
155 Ok(Err(_)) => Err("Query channel closed".to_string()),
156 Err(_) => Err("Timeout waiting for query response".to_string()),
157 };
158 }
159 }
160 Err(format!("Query not found: {}", query_id))
161}
162
163async fn client_execute_view(
164 client: Arc<MykoClient>,
165 view_id: &str,
166 arguments: Value,
167) -> Result<Value, String> {
168 for reg in inventory::iter::<ViewRegistration> {
169 if reg.view_id == view_id {
170 let tx = Uuid::new_v4().to_string();
171 let mut view_json = arguments_object(arguments);
172 if let Some(obj) = view_json.as_object_mut() {
173 obj.insert("tx".to_string(), json!(tx));
174 obj.insert(
175 "createdAt".to_string(),
176 json!(chrono::Utc::now().to_rfc3339()),
177 );
178 }
179
180 let wrapped = WrappedView {
181 view: view_json,
182 view_id: reg.view_id.into(),
183 view_item_type: reg.view_item_type.into(),
184 window: None,
185 };
186
187 let cell = client.watch_view_raw(wrapped);
188 let (result_tx, result_rx) = oneshot::channel::<Vec<Value>>();
189 let result_tx = Arc::new(Mutex::new(Some(result_tx)));
190 let seen_initial = Arc::new(Mutex::new(false));
191 let result_tx_sub = result_tx.clone();
192 let seen_initial_sub = seen_initial.clone();
193 let _guard = cell.subscribe(move |signal| {
194 if let hyphae::Signal::Value(items) = signal {
195 let mut seen = seen_initial_sub.lock().unwrap();
196 if !*seen {
197 *seen = true;
198 return;
199 }
200 if let Some(tx) = result_tx_sub.lock().unwrap().take() {
201 let _ = tx.send((**items).clone());
202 }
203 }
204 });
205
206 return match tokio::time::timeout(QUERY_TIMEOUT, result_rx).await {
207 Ok(Ok(items)) => Ok(json!({
208 "view_id": view_id,
209 "item_type": reg.view_item_type,
210 "count": items.len(),
211 "items": items,
212 })),
213 Ok(Err(_)) => Err("View channel closed".to_string()),
214 Err(_) => Err("Timeout waiting for view response".to_string()),
215 };
216 }
217 }
218 Err(format!("View not found: {}", view_id))
219}
220
221async fn client_execute_report(
222 client: Arc<MykoClient>,
223 report_id: &str,
224 arguments: Value,
225) -> Result<Value, String> {
226 for reg in inventory::iter::<ReportRegistration> {
227 if reg.report_id == report_id {
228 let tx = Uuid::new_v4().to_string();
229 let mut report_json = arguments_object(arguments);
230 if let Some(obj) = report_json.as_object_mut() {
231 obj.insert("tx".to_string(), json!(tx));
232 }
233
234 let wrapped = WrappedReport {
235 report: report_json,
236 report_id: reg.report_id.to_string(),
237 };
238
239 let cell = client.watch_report_raw(wrapped);
240 let (result_tx, result_rx) = oneshot::channel::<Value>();
241 let result_tx = Arc::new(Mutex::new(Some(result_tx)));
242 let _guard = cell.subscribe(move |signal| {
243 if let hyphae::Signal::Value(value_opt) = signal
244 && let Some(value) = &**value_opt
245 && let Some(tx) = result_tx.lock().unwrap().take()
246 {
247 let _ = tx.send(value.clone());
248 }
249 });
250
251 return match tokio::time::timeout(REPORT_TIMEOUT, result_rx).await {
252 Ok(Ok(value)) => Ok(json!({
253 "report_id": report_id,
254 "output_type": reg.output_type,
255 "result": value,
256 })),
257 Ok(Err(_)) => Err("Report channel closed".to_string()),
258 Err(_) => Err("Timeout waiting for report response".to_string()),
259 };
260 }
261 }
262 Err(format!("Report not found: {}", report_id))
263}
264
265async fn client_execute_command(
266 client: Arc<MykoClient>,
267 command_id: &str,
268 arguments: Value,
269) -> Result<Value, String> {
270 let status = client.connection_status().get();
271 if !matches!(status, ConnectionStatus::Connected(_)) {
272 let (tx_connected, rx_connected) = oneshot::channel::<bool>();
273 let tx_connected = Mutex::new(Some(tx_connected));
274 let guard = client.connection_status().subscribe(move |signal| {
275 if let hyphae::Signal::Value(status) = signal
276 && let ConnectionStatus::Connected(_) = &**status
277 && let Some(sender) = tx_connected.lock().unwrap().take()
278 {
279 let _ = sender.send(true);
280 }
281 });
282
283 let connected = tokio::time::timeout(CONNECT_TIMEOUT, rx_connected)
284 .await
285 .unwrap_or(Ok(false))
286 .unwrap_or(false);
287 drop(guard);
288
289 if !connected {
290 return Err("Not connected to Myko server".to_string());
291 }
292 }
293
294 let tx = Uuid::new_v4().to_string();
295 let mut command_json = arguments_object(arguments);
296 if let Some(obj) = command_json.as_object_mut() {
297 obj.insert("tx".to_string(), json!(tx));
298 }
299
300 let wrapped = WrappedCommand {
301 command: command_json,
302 command_id: command_id.to_string(),
303 };
304
305 let result_cell = client.send_command_raw_result(wrapped);
306 let (resp_tx, resp_rx) = oneshot::channel::<Result<Value, String>>();
307 let resp_tx = Arc::new(Mutex::new(Some(resp_tx)));
308 let _guard = result_cell.subscribe(move |signal| {
309 if let hyphae::Signal::Value(result_opt) = signal
310 && let Some(result) = &**result_opt
311 && let Some(sender) = resp_tx.lock().unwrap().take()
312 {
313 let _ = sender.send(result.clone());
314 }
315 });
316
317 match tokio::time::timeout(COMMAND_TIMEOUT, resp_rx).await {
318 Ok(Ok(Ok(response))) => Ok(json!({
319 "command_id": command_id,
320 "success": true,
321 "result": response,
322 })),
323 Ok(Ok(Err(e))) => Err(e),
324 _ => Err("Timeout waiting for response".to_string()),
325 }
326}
327
328fn in_process_execute_query(
333 ctx: Arc<CellServerCtx>,
334 query_id: &str,
335 arguments: Value,
336) -> Result<Value, String> {
337 let registration = inventory::iter::<QueryRegistration>
338 .into_iter()
339 .find(|r| r.query_id == query_id)
340 .ok_or_else(|| format!("Query not found: {}", query_id))?;
341
342 let query_data = ctx
343 .handler_registry
344 .get_query(query_id)
345 .ok_or_else(|| format!("Query handler not registered: {}", query_id))?;
346
347 let mut query_json = arguments_object(arguments);
348 let tx: Arc<str> = Uuid::new_v4().to_string().into();
349 if let Some(obj) = query_json.as_object_mut() {
350 obj.insert("tx".to_string(), json!(tx.as_ref()));
351 obj.insert(
352 "createdAt".to_string(),
353 json!(chrono::Utc::now().to_rfc3339()),
354 );
355 }
356
357 let parsed = (query_data.parse)(query_json)
358 .map_err(|e| format!("Failed to parse query {}: {}", query_id, e))?;
359
360 let request_context = Arc::new(RequestContext::internal(tx, ctx.host_id, "mcp"));
361
362 let cellmap = (query_data.cell_factory)(
363 parsed,
364 ctx.registry.clone(),
365 request_context,
366 Some(ctx.clone()),
367 )
368 .map_err(|e| format!("Failed to build query cell: {}", e))?;
369
370 let items: Vec<Value> = cellmap
371 .snapshot()
372 .into_iter()
373 .map(|(_, item)| serde_json::to_value(&*item).unwrap_or(Value::Null))
374 .collect();
375
376 Ok(json!({
377 "query_id": query_id,
378 "item_type": registration.query_item_type,
379 "count": items.len(),
380 "items": items,
381 }))
382}
383
384fn in_process_execute_view(
385 ctx: Arc<CellServerCtx>,
386 view_id: &str,
387 arguments: Value,
388) -> Result<Value, String> {
389 let registration = inventory::iter::<ViewRegistration>
390 .into_iter()
391 .find(|r| r.view_id == view_id)
392 .ok_or_else(|| format!("View not found: {}", view_id))?;
393
394 let view_data = ctx
395 .handler_registry
396 .get_view(view_id)
397 .ok_or_else(|| format!("View handler not registered: {}", view_id))?;
398
399 let mut view_json = arguments_object(arguments);
400 let tx: Arc<str> = Uuid::new_v4().to_string().into();
401 if let Some(obj) = view_json.as_object_mut() {
402 obj.insert("tx".to_string(), json!(tx.as_ref()));
403 obj.insert(
404 "createdAt".to_string(),
405 json!(chrono::Utc::now().to_rfc3339()),
406 );
407 }
408
409 let parsed = (view_data.parse)(view_json)
410 .map_err(|e| format!("Failed to parse view {}: {}", view_id, e))?;
411
412 let request_context = Arc::new(RequestContext::internal(tx, ctx.host_id, "mcp"));
413
414 let cellmap =
415 (view_data.cell_factory)(parsed, ctx.registry.clone(), request_context, ctx.clone())
416 .map_err(|e| format!("Failed to build view cell: {}", e))?;
417
418 let items: Vec<Value> = cellmap
419 .snapshot()
420 .into_iter()
421 .map(|(_, item)| serde_json::to_value(&*item).unwrap_or(Value::Null))
422 .collect();
423
424 Ok(json!({
425 "view_id": view_id,
426 "item_type": registration.view_item_type,
427 "count": items.len(),
428 "items": items,
429 }))
430}
431
432async fn in_process_execute_report(
433 ctx: Arc<CellServerCtx>,
434 report_id: &str,
435 arguments: Value,
436) -> Result<Value, String> {
437 let registration = inventory::iter::<ReportRegistration>
438 .into_iter()
439 .find(|r| r.report_id == report_id)
440 .ok_or_else(|| format!("Report not found: {}", report_id))?;
441
442 let report_data = ctx
443 .handler_registry
444 .get_report(report_id)
445 .ok_or_else(|| format!("Report handler not registered: {}", report_id))?;
446
447 let mut report_json = arguments_object(arguments);
448 let tx: Arc<str> = Uuid::new_v4().to_string().into();
449 if let Some(obj) = report_json.as_object_mut() {
450 obj.insert("tx".to_string(), json!(tx.as_ref()));
451 }
452
453 let parsed = (report_data.parse)(report_json)
454 .map_err(|e| format!("Failed to parse report {}: {}", report_id, e))?;
455
456 let request_context = Arc::new(RequestContext::internal(tx, ctx.host_id, "mcp"));
457
458 let cell = (report_data.cell_factory)(parsed, request_context, ctx)
459 .map_err(|e| format!("Failed to build report cell: {}", e))?;
460
461 let (tx_resp, rx_resp) = oneshot::channel::<Value>();
463 let tx_resp = Arc::new(Mutex::new(Some(tx_resp)));
464 let tx_resp_sub = tx_resp.clone();
465 let _guard = cell.subscribe(move |signal| {
466 if let hyphae::Signal::Value(output) = signal
467 && let Some(sender) = tx_resp_sub.lock().unwrap().take()
468 {
469 let _ = sender.send(output.to_value());
470 }
471 });
472
473 match tokio::time::timeout(REPORT_TIMEOUT, rx_resp).await {
474 Ok(Ok(value)) => Ok(json!({
475 "report_id": report_id,
476 "output_type": registration.output_type,
477 "result": value,
478 })),
479 Ok(Err(_)) => Err("Report cell dropped before emitting".to_string()),
480 Err(_) => Err("Timeout waiting for report value".to_string()),
481 }
482}
483
484fn in_process_execute_command(
485 ctx: Arc<CellServerCtx>,
486 command_id: &str,
487 arguments: Value,
488) -> Result<Value, String> {
489 let mut command_json = arguments_object(arguments);
490 let tx: Arc<str> = Uuid::new_v4().to_string().into();
491 if let Some(obj) = command_json.as_object_mut() {
492 obj.insert("tx".to_string(), json!(tx.as_ref()));
493 }
494
495 for registration in inventory::iter::<CommandHandlerRegistration> {
496 if registration.command_id == command_id {
497 let executor = (registration.factory)();
498 let req = Arc::new(RequestContext::internal(tx.clone(), ctx.host_id, "mcp"));
499 let cmd_id: Arc<str> = Arc::from(command_id);
500 let cmd_ctx = CommandContext::new(cmd_id, req, ctx.clone());
501
502 return match executor.execute_from_value(command_json, cmd_ctx) {
503 Ok(result) => Ok(json!({
504 "command_id": command_id,
505 "success": true,
506 "result": result,
507 })),
508 Err(err) => Err(err.message),
509 };
510 }
511 }
512
513 Err(format!("Command handler not found: {}", command_id))
514}
515
516fn arguments_object(arguments: Value) -> Value {
521 if arguments.is_object() {
522 arguments
523 } else {
524 json!({})
525 }
526}