1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::sync::{Arc, OnceLock};
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow};
7use jsonschema::{Draft, JSONSchema};
8use rmcp::model::{
9 CallToolRequestParams, CallToolResult, ErrorCode, ErrorData, Implementation, InitializeResult,
10 ListToolsResult, PaginatedRequestParams, ServerCapabilities, Tool, ToolAnnotations,
11};
12use rmcp::service::{RequestContext, RoleServer};
13use rmcp::{ServerHandler, ServiceExt};
14use serde::Deserialize;
15use serde_json::{Map, Value, json};
16use tokio::sync::{Mutex as AsyncMutex, broadcast};
17
18use crate::config::ApiConfig;
19use crate::errors::ApiError;
20use crate::protocol::{KNOWN_METHODS, MUTATING_METHODS};
21use crate::runtime::RpcRuntime;
22use crate::stream_domain::StreamEventEnvelope;
23
24const MCP_PRINCIPAL: &str = "trusted_local";
25const TOOL_PAGE_SIZE: usize = 128;
26const DEFAULT_STREAM_NEXT_WAIT_MS: u64 = 30_000;
27const MAX_STREAM_NEXT_WAIT_MS: u64 = 120_000;
28const DEFAULT_STREAM_NEXT_EVENTS: u16 = 50;
29const MAX_STREAM_NEXT_EVENTS: u16 = 200;
30
31#[derive(Clone)]
32pub struct RalphMcpServer {
33 runtime: RpcRuntime,
34 catalog: Arc<ToolCatalog>,
35 subscriptions: Arc<AsyncMutex<HashMap<String, Arc<AsyncMutex<McpSubscriptionState>>>>>,
36}
37
38impl RalphMcpServer {
39 pub fn new(config: ApiConfig) -> Result<Self> {
40 let runtime = RpcRuntime::new(config)?;
41 let catalog = Arc::new(tool_catalog().clone());
42 Ok(Self {
43 runtime,
44 catalog,
45 subscriptions: Arc::new(AsyncMutex::new(HashMap::new())),
46 })
47 }
48
49 fn invoke_rpc_tool(
50 &self,
51 method: &str,
52 arguments: Value,
53 request_id: &str,
54 tool_name: &str,
55 ) -> Result<Value, ApiError> {
56 let idempotency_key = MUTATING_METHODS
57 .contains(&method)
58 .then(|| format!("mcp:{tool_name}:{request_id}"));
59 self.runtime.invoke_method(
60 request_id.to_string(),
61 method,
62 arguments,
63 MCP_PRINCIPAL,
64 idempotency_key,
65 )
66 }
67
68 fn call_rpc_tool(
69 &self,
70 method: &str,
71 arguments: Value,
72 request_id: &str,
73 tool_name: &str,
74 ) -> Result<CallToolResult, ApiError> {
75 let result = self.invoke_rpc_tool(method, arguments, request_id, tool_name)?;
76 Ok(CallToolResult::structured(result))
77 }
78
79 async fn call_stream_subscribe(
80 &self,
81 arguments: Value,
82 request_id: &str,
83 tool_name: &str,
84 ) -> Result<CallToolResult, ApiError> {
85 let live_rx = self.runtime.stream_domain().live_receiver();
86 let result = self.invoke_rpc_tool("stream.subscribe", arguments, request_id, tool_name)?;
87 let subscription_id = subscription_id_from_result(&result)
88 .ok_or_else(|| ApiError::internal("stream.subscribe result missing subscriptionId"))
89 .map_err(|error| {
90 error.with_context(request_id.to_string(), Some("stream.subscribe".to_string()))
91 })?;
92 self.subscriptions.lock().await.insert(
93 subscription_id.to_string(),
94 Arc::new(AsyncMutex::new(McpSubscriptionState { live_rx })),
95 );
96 Ok(CallToolResult::structured(result))
97 }
98
99 async fn call_stream_unsubscribe(
100 &self,
101 arguments: Value,
102 request_id: &str,
103 tool_name: &str,
104 ) -> Result<CallToolResult, ApiError> {
105 let subscription_id = subscription_id_from_arguments(&arguments)
106 .ok_or_else(|| ApiError::internal("stream.unsubscribe args missing subscriptionId"))
107 .map_err(|error| {
108 error.with_context(
109 request_id.to_string(),
110 Some("stream.unsubscribe".to_string()),
111 )
112 })?
113 .to_string();
114 let result =
115 self.invoke_rpc_tool("stream.unsubscribe", arguments, request_id, tool_name)?;
116 self.subscriptions.lock().await.remove(&subscription_id);
117 Ok(CallToolResult::structured(result))
118 }
119
120 async fn call_stream_next(
121 &self,
122 arguments: Value,
123 request_id: &str,
124 ) -> Result<CallToolResult, ApiError> {
125 let params: StreamNextParams = serde_json::from_value(arguments).map_err(|error| {
126 ApiError::invalid_params(format!("invalid params for method 'stream.next': {error}"))
127 .with_context(request_id.to_string(), Some("stream.next".to_string()))
128 })?;
129 let wait_ms = params
130 .wait_ms
131 .unwrap_or(DEFAULT_STREAM_NEXT_WAIT_MS)
132 .clamp(1, MAX_STREAM_NEXT_WAIT_MS);
133 let max_events = params
134 .max_events
135 .unwrap_or(DEFAULT_STREAM_NEXT_EVENTS)
136 .clamp(1, MAX_STREAM_NEXT_EVENTS) as usize;
137
138 let streams = self.runtime.stream_domain();
139 if !streams.has_subscription(¶ms.subscription_id) {
140 return Err(ApiError::not_found(format!(
141 "subscription '{}' not found",
142 params.subscription_id
143 ))
144 .with_context(request_id.to_string(), Some("stream.next".to_string()))
145 .with_details(json!({ "subscriptionId": params.subscription_id })));
146 }
147
148 let subscription = self.lookup_subscription(¶ms.subscription_id).await?;
149 let replay = streams
150 .replay_for_subscription(¶ms.subscription_id)
151 .map_err(|error| {
152 error.with_context(request_id.to_string(), Some("stream.next".to_string()))
153 })?;
154
155 let mut events = replay.events;
156 let mut dropped_count = replay.dropped_count;
157 if !events.is_empty() || dropped_count > 0 {
158 events.truncate(max_events);
159 return Ok(stream_next_result(events, false, dropped_count));
160 }
161
162 let mut live_state = subscription.lock().await;
163 let current_cursor = streams
164 .subscription_cursor(¶ms.subscription_id)
165 .map_err(|error| {
166 error.with_context(request_id.to_string(), Some("stream.next".to_string()))
167 })?;
168 let current_cursor_sequence = streams
169 .subscription_cursor_sequence(¶ms.subscription_id)
170 .map_err(|error| {
171 error.with_context(request_id.to_string(), Some("stream.next".to_string()))
172 })?;
173
174 if let Some(result) = collect_live_events(
175 &streams,
176 ¶ms.subscription_id,
177 ¤t_cursor,
178 current_cursor_sequence,
179 &mut live_state.live_rx,
180 max_events,
181 dropped_count,
182 ) {
183 return Ok(stream_next_result(
184 result.events,
185 false,
186 result.dropped_count,
187 ));
188 }
189
190 let timer = tokio::time::sleep(Duration::from_millis(wait_ms));
191 tokio::pin!(timer);
192
193 loop {
194 tokio::select! {
195 _ = &mut timer => {
196 return Ok(stream_next_result(Vec::new(), true, dropped_count));
197 }
198 message = live_state.live_rx.recv() => {
199 match message {
200 Ok(event) => {
201 if is_pending_live_event(
202 &streams,
203 ¶ms.subscription_id,
204 ¤t_cursor,
205 current_cursor_sequence,
206 &event,
207 ) {
208 events.push(event);
209 let drained = collect_live_events(
210 &streams,
211 ¶ms.subscription_id,
212 ¤t_cursor,
213 current_cursor_sequence,
214 &mut live_state.live_rx,
215 max_events - events.len(),
216 dropped_count,
217 );
218 if let Some(drained) = drained {
219 dropped_count = drained.dropped_count;
220 events.extend(drained.events);
221 }
222 return Ok(stream_next_result(events, false, dropped_count));
223 }
224 }
225 Err(broadcast::error::RecvError::Lagged(skipped)) => {
226 dropped_count = dropped_count.saturating_add(skipped as usize);
227 return Ok(stream_next_result(Vec::new(), false, dropped_count));
228 }
229 Err(broadcast::error::RecvError::Closed) => {
230 return Err(ApiError::service_unavailable("stream receiver closed")
231 .with_context(request_id.to_string(), Some("stream.next".to_string())));
232 }
233 }
234 }
235 }
236 }
237 }
238
239 async fn lookup_subscription(
240 &self,
241 subscription_id: &str,
242 ) -> Result<Arc<AsyncMutex<McpSubscriptionState>>, ApiError> {
243 self.subscriptions
244 .lock()
245 .await
246 .get(subscription_id)
247 .cloned()
248 .ok_or_else(|| {
249 ApiError::not_found(format!(
250 "subscription '{}' is not managed by this MCP server",
251 subscription_id
252 ))
253 .with_details(json!({ "subscriptionId": subscription_id }))
254 })
255 }
256}
257
258impl ServerHandler for RalphMcpServer {
259 fn get_info(&self) -> InitializeResult {
260 InitializeResult::new(ServerCapabilities::builder().enable_tools().build())
261 .with_server_info(
262 Implementation::new("ralph-mcp", env!("CARGO_PKG_VERSION"))
263 .with_title("Ralph MCP Server"),
264 )
265 .with_instructions(
266 "Use the Ralph control-plane tools to inspect and manage local orchestration state.",
267 )
268 }
269
270 fn list_tools(
271 &self,
272 request: Option<PaginatedRequestParams>,
273 _context: RequestContext<RoleServer>,
274 ) -> impl Future<Output = Result<ListToolsResult, ErrorData>> + Send + '_ {
275 std::future::ready(
276 self.catalog
277 .list_tools(request.as_ref().and_then(|value| value.cursor.as_deref())),
278 )
279 }
280
281 fn get_tool(&self, name: &str) -> Option<Tool> {
282 self.catalog.lookup(name).map(|entry| entry.tool.clone())
283 }
284
285 async fn call_tool(
286 &self,
287 request: CallToolRequestParams,
288 context: RequestContext<RoleServer>,
289 ) -> Result<CallToolResult, ErrorData> {
290 let request_id = context.id.to_string();
291 let Some(entry) = self.catalog.lookup(request.name.as_ref()) else {
292 return Err(ErrorData::new(
293 ErrorCode::METHOD_NOT_FOUND,
294 format!("tool '{}' is not available", request.name),
295 None,
296 ));
297 };
298
299 let arguments = Value::Object(request.arguments.unwrap_or_default());
300 if let Err(errors) = entry.validate_input(&arguments) {
301 let error = ApiError::invalid_params(format!(
302 "tool '{}' arguments do not match the published schema",
303 request.name
304 ))
305 .with_context(request_id.clone(), Some(entry.method_name().to_string()))
306 .with_details(json!({ "errors": errors }));
307 return Ok(tool_error_result(error));
308 }
309
310 let result = match &entry.target {
311 ToolTarget::Rpc { method } => match method.as_str() {
312 "stream.subscribe" => {
313 self.call_stream_subscribe(arguments, &request_id, entry.tool.name.as_ref())
314 .await
315 }
316 "stream.unsubscribe" => {
317 self.call_stream_unsubscribe(arguments, &request_id, entry.tool.name.as_ref())
318 .await
319 }
320 _ => self.call_rpc_tool(method, arguments, &request_id, entry.tool.name.as_ref()),
321 },
322 ToolTarget::StreamNext => self.call_stream_next(arguments, &request_id).await,
323 };
324
325 match result {
326 Ok(result) => Ok(result),
327 Err(error) => Ok(tool_error_result(error)),
328 }
329 }
330}
331
332pub async fn serve_stdio(config: ApiConfig) -> Result<()> {
333 let server = RalphMcpServer::new(config)?;
334 let running = server.serve(rmcp::transport::stdio()).await?;
335 let _ = running.waiting().await?;
336 Ok(())
337}
338
339#[derive(Clone)]
340struct ToolCatalog {
341 entries: Vec<ToolEntry>,
342 by_name: HashMap<String, usize>,
343}
344
345impl ToolCatalog {
346 fn load() -> Result<Self> {
347 let root_schema: Value = serde_json::from_str(include_str!("../data/rpc-v1-schema.json"))
348 .context("embedded rpc-v1 schema must be valid JSON")?;
349 let request_variants = schema_method_map(&root_schema, "requestByMethod", "params")?;
350 let response_variants = schema_method_map(&root_schema, "responseByMethod", "result")?;
351
352 let mut entries = Vec::new();
353 for method in KNOWN_METHODS {
354 let name = rpc_method_to_tool_name(method);
355 let input_ref = request_variants
356 .get(*method)
357 .with_context(|| format!("missing params schema for method '{method}'"))?;
358 let output_ref = response_variants
359 .get(*method)
360 .with_context(|| format!("missing result schema for method '{method}'"))?;
361
362 let input_schema = schema_ref_object(&root_schema, input_ref)?;
363 let output_schema = schema_ref_object(&root_schema, output_ref)?;
364 let tool = build_tool(name, method, &input_schema, Some(&output_schema));
365 let input_validator = compile_validator(&input_schema)?;
366
367 entries.push(ToolEntry {
368 tool,
369 target: ToolTarget::Rpc {
370 method: (*method).to_string(),
371 },
372 input_validator,
373 });
374 }
375
376 let stream_next_input = stream_next_input_schema();
377 entries.push(ToolEntry {
378 tool: build_tool(
379 "stream_next".to_string(),
380 "stream.next",
381 &stream_next_input,
382 Some(&stream_next_output_schema()),
383 ),
384 target: ToolTarget::StreamNext,
385 input_validator: compile_validator(&stream_next_input)?,
386 });
387
388 entries.sort_by(|left, right| left.tool.name.cmp(&right.tool.name));
389 let by_name = entries
390 .iter()
391 .enumerate()
392 .map(|(index, entry)| (entry.tool.name.to_string(), index))
393 .collect();
394
395 Ok(Self { entries, by_name })
396 }
397
398 fn lookup(&self, name: &str) -> Option<&ToolEntry> {
399 self.by_name
400 .get(name)
401 .and_then(|index| self.entries.get(*index))
402 }
403
404 fn list_tools(&self, cursor: Option<&str>) -> Result<ListToolsResult, ErrorData> {
405 let offset = cursor
406 .map(|value| {
407 value.parse::<usize>().map_err(|_| {
408 ErrorData::invalid_params(format!("invalid tools/list cursor '{value}'"), None)
409 })
410 })
411 .transpose()?
412 .unwrap_or(0);
413
414 if offset > self.entries.len() {
415 return Err(ErrorData::invalid_params(
416 format!("tools/list cursor '{}' is out of range", offset),
417 None,
418 ));
419 }
420
421 let end = (offset + TOOL_PAGE_SIZE).min(self.entries.len());
422 let mut result = ListToolsResult::with_all_items(
423 self.entries[offset..end]
424 .iter()
425 .map(|entry| entry.tool.clone())
426 .collect(),
427 );
428 if end < self.entries.len() {
429 result.next_cursor = Some(end.to_string());
430 }
431 Ok(result)
432 }
433}
434
435#[derive(Clone)]
436struct ToolEntry {
437 tool: Tool,
438 target: ToolTarget,
439 input_validator: Arc<JSONSchema>,
440}
441
442impl ToolEntry {
443 fn method_name(&self) -> &str {
444 match &self.target {
445 ToolTarget::Rpc { method } => method,
446 ToolTarget::StreamNext => "stream.next",
447 }
448 }
449
450 fn validate_input(&self, arguments: &Value) -> Result<(), Vec<String>> {
451 match self.input_validator.validate(arguments) {
452 Ok(()) => Ok(()),
453 Err(errors) => Err(errors.map(|error| error.to_string()).collect()),
454 }
455 }
456}
457
458#[derive(Clone)]
459enum ToolTarget {
460 Rpc { method: String },
461 StreamNext,
462}
463
464#[derive(Debug, Deserialize)]
465#[serde(rename_all = "camelCase")]
466struct StreamNextParams {
467 subscription_id: String,
468 wait_ms: Option<u64>,
469 max_events: Option<u16>,
470}
471
472struct McpSubscriptionState {
473 live_rx: broadcast::Receiver<StreamEventEnvelope>,
474}
475
476struct LiveEventBatch {
477 events: Vec<StreamEventEnvelope>,
478 dropped_count: usize,
479}
480
481fn subscription_id_from_result(result: &Value) -> Option<&str> {
482 result.get("subscriptionId").and_then(Value::as_str)
483}
484
485fn subscription_id_from_arguments(arguments: &Value) -> Option<&str> {
486 arguments.get("subscriptionId").and_then(Value::as_str)
487}
488
489fn is_pending_live_event(
490 streams: &crate::stream_domain::StreamDomain,
491 subscription_id: &str,
492 current_cursor: &str,
493 cursor_sequence: u64,
494 event: &StreamEventEnvelope,
495) -> bool {
496 streams.matches_subscription(subscription_id, event)
497 && (event.sequence > cursor_sequence
498 || (event.sequence == cursor_sequence && event.cursor != current_cursor))
499}
500
501fn collect_live_events(
502 streams: &crate::stream_domain::StreamDomain,
503 subscription_id: &str,
504 current_cursor: &str,
505 cursor_sequence: u64,
506 live_rx: &mut broadcast::Receiver<StreamEventEnvelope>,
507 max_events: usize,
508 dropped_count: usize,
509) -> Option<LiveEventBatch> {
510 if max_events == 0 {
511 return Some(LiveEventBatch {
512 events: Vec::new(),
513 dropped_count,
514 });
515 }
516
517 let mut events = Vec::new();
518 let mut dropped_count = dropped_count;
519 loop {
520 match live_rx.try_recv() {
521 Ok(event) => {
522 if is_pending_live_event(
523 streams,
524 subscription_id,
525 current_cursor,
526 cursor_sequence,
527 &event,
528 ) {
529 events.push(event);
530 if events.len() >= max_events {
531 break;
532 }
533 }
534 }
535 Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
536 dropped_count = dropped_count.saturating_add(skipped as usize);
537 break;
538 }
539 Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
540 break;
541 }
542 }
543 }
544
545 (!events.is_empty() || dropped_count > 0).then_some(LiveEventBatch {
546 events,
547 dropped_count,
548 })
549}
550
551fn tool_catalog() -> &'static ToolCatalog {
552 static CATALOG: OnceLock<ToolCatalog> = OnceLock::new();
553 CATALOG.get_or_init(|| {
554 ToolCatalog::load().expect("embedded rpc-v1 schema must be convertible into an MCP catalog")
555 })
556}
557
558fn rpc_method_to_tool_name(method: &str) -> String {
559 method.replace('.', "_")
560}
561
562fn build_tool(
563 name: String,
564 method: &str,
565 input_schema: &Map<String, Value>,
566 output_schema: Option<&Map<String, Value>>,
567) -> Tool {
568 let description: Cow<'static, str> = match method {
569 "system.health" => "Return the Ralph control-plane health snapshot.".into(),
570 "system.version" => "Return the Ralph control-plane API and server version.".into(),
571 "system.capabilities" => {
572 "List supported Ralph control-plane methods and stream topics.".into()
573 }
574 "task.list" => "List Ralph tasks, with optional filters.".into(),
575 "task.ready" => "List open tasks that are ready to run.".into(),
576 "task.run_all" => "Enqueue every open or queued Ralph task.".into(),
577 "loop.status" => "Return the current primary loop and merge status.".into(),
578 "loop.trigger_merge_task" => "Create a merge task for a completed loop.".into(),
579 "planning.get_artifact" => "Read a generated planning artifact by filename.".into(),
580 "config.get" => "Read the current Ralph YAML configuration.".into(),
581 "config.update" => "Replace the Ralph YAML configuration after validation.".into(),
582 "preset.list" => "List all available Ralph presets.".into(),
583 "collection.import" => "Import a preset collection from YAML.".into(),
584 "collection.export" => "Export a preset collection to YAML.".into(),
585 "stream.subscribe" => "Create a Ralph event stream subscription.".into(),
586 "stream.unsubscribe" => "Close a Ralph event stream subscription.".into(),
587 "stream.ack" => "Advance a Ralph event stream subscription cursor.".into(),
588 "stream.next" => "Poll for the next batch of Ralph stream events.".into(),
589 _ => format!("Invoke Ralph control-plane method `{method}`.").into(),
590 };
591
592 let mut tool = Tool::new_with_raw(name, Some(description), Arc::new(input_schema.clone()));
593 tool.title = Some(method.to_string());
594 tool.output_schema = output_schema.cloned().map(Arc::new);
595 tool.annotations = Some(
596 ToolAnnotations::with_title(method)
597 .read_only(!MUTATING_METHODS.contains(&method))
598 .destructive(MUTATING_METHODS.contains(&method))
599 .open_world(false),
600 );
601 tool
602}
603
604fn schema_method_map(
605 root_schema: &Value,
606 def_name: &str,
607 property_name: &str,
608) -> Result<HashMap<String, String>> {
609 let variants = root_schema
610 .pointer(&format!("/$defs/{def_name}/oneOf"))
611 .and_then(Value::as_array)
612 .with_context(|| format!("schema def '{def_name}' must expose a oneOf array"))?;
613
614 let mut map = HashMap::new();
615 for variant in variants {
616 let method = variant
617 .pointer("/properties/method/const")
618 .and_then(Value::as_str)
619 .context("schema variant is missing properties.method.const")?;
620 let schema_ref = variant
621 .pointer(&format!("/properties/{property_name}/$ref"))
622 .and_then(Value::as_str)
623 .with_context(|| {
624 format!("schema variant '{method}' is missing {property_name}.$ref")
625 })?;
626 map.insert(method.to_string(), schema_ref.to_string());
627 }
628
629 Ok(map)
630}
631
632fn schema_ref_object(root_schema: &Value, schema_ref: &str) -> Result<Map<String, Value>> {
633 let defs = root_schema
634 .get("$defs")
635 .cloned()
636 .context("schema must expose $defs")?;
637 let schema = json!({
638 "$schema": root_schema
639 .get("$schema")
640 .cloned()
641 .unwrap_or_else(|| json!("https://json-schema.org/draft/2020-12/schema")),
642 "$defs": defs,
643 "$ref": schema_ref,
644 });
645 schema
646 .as_object()
647 .cloned()
648 .context("generated schema object must be a JSON object")
649}
650
651fn compile_validator(schema: &Map<String, Value>) -> Result<Arc<JSONSchema>> {
652 let schema_value = Value::Object(schema.clone());
653 let validator = JSONSchema::options()
654 .with_draft(Draft::Draft202012)
655 .compile(&schema_value)
656 .map_err(|error| anyhow!("tool schema must compile: {error}"))?;
657 Ok(Arc::new(validator))
658}
659
660fn stream_next_input_schema() -> Map<String, Value> {
661 json!({
662 "$schema": "https://json-schema.org/draft/2020-12/schema",
663 "type": "object",
664 "additionalProperties": false,
665 "properties": {
666 "subscriptionId": {
667 "type": "string",
668 "minLength": 1
669 },
670 "waitMs": {
671 "type": "integer",
672 "minimum": 1,
673 "maximum": MAX_STREAM_NEXT_WAIT_MS
674 },
675 "maxEvents": {
676 "type": "integer",
677 "minimum": 1,
678 "maximum": MAX_STREAM_NEXT_EVENTS
679 }
680 },
681 "required": ["subscriptionId"]
682 })
683 .as_object()
684 .cloned()
685 .expect("stream.next input schema must be an object")
686}
687
688fn stream_next_output_schema() -> Map<String, Value> {
689 let event_schema: Value = serde_json::from_str(include_str!("../data/rpc-v1-events.json"))
690 .expect("embedded rpc-v1 event schema must be valid JSON");
691 let defs = event_schema
692 .get("$defs")
693 .cloned()
694 .expect("embedded rpc-v1 event schema must expose $defs");
695
696 json!({
697 "$schema": "https://json-schema.org/draft/2020-12/schema",
698 "$defs": defs,
699 "type": "object",
700 "additionalProperties": false,
701 "properties": {
702 "events": {
703 "type": "array",
704 "items": { "$ref": "#/$defs/eventEnvelope" }
705 },
706 "timedOut": { "type": "boolean" },
707 "droppedCount": { "type": "integer", "minimum": 0 }
708 },
709 "required": ["events", "timedOut", "droppedCount"]
710 })
711 .as_object()
712 .cloned()
713 .expect("stream.next output schema must be an object")
714}
715
716fn stream_next_result(
717 events: Vec<StreamEventEnvelope>,
718 timed_out: bool,
719 dropped_count: usize,
720) -> CallToolResult {
721 CallToolResult::structured(json!({
722 "events": events,
723 "timedOut": timed_out,
724 "droppedCount": dropped_count,
725 }))
726}
727
728fn tool_error_result(error: ApiError) -> CallToolResult {
729 CallToolResult::structured_error(json!({
730 "code": error.code.as_str(),
731 "message": error.message,
732 "details": error.details,
733 "requestId": error.request_id,
734 "method": error.method,
735 "retryable": error.retryable,
736 }))
737}
738
739#[cfg(test)]
740mod tests {
741 use tempfile::TempDir;
742
743 use super::*;
744
745 fn test_server() -> (RalphMcpServer, TempDir) {
746 let workspace = tempfile::tempdir().expect("workspace tempdir should be created");
747 let config = ApiConfig {
748 workspace_root: workspace.path().to_path_buf(),
749 ..ApiConfig::default()
750 };
751 (
752 RalphMcpServer::new(config).expect("MCP server should initialize"),
753 workspace,
754 )
755 }
756
757 #[test]
758 fn catalog_maps_known_tools() {
759 let catalog = tool_catalog();
760 assert!(catalog.lookup("task_list").is_some());
761 assert!(catalog.lookup("loop_trigger_merge_task").is_some());
762 assert!(catalog.lookup("stream_next").is_some());
763 }
764
765 #[test]
766 fn catalog_publishes_output_schemas() {
767 let catalog = tool_catalog();
768 let task_create = catalog.lookup("task_create").expect("task_create tool");
769 assert!(task_create.tool.output_schema.is_some());
770 let stream_next = catalog.lookup("stream_next").expect("stream_next tool");
771 assert!(stream_next.tool.output_schema.is_some());
772 }
773
774 #[tokio::test]
775 async fn stream_next_times_out_without_events() {
776 let (server, _workspace) = test_server();
777 let subscribed = server
778 .call_stream_subscribe(
779 json!({ "topics": ["task.status.changed"] }),
780 "req-stream-subscribe-1",
781 "stream_subscribe",
782 )
783 .await
784 .expect("stream.subscribe should succeed");
785 let subscription_id = subscribed.structured_content.unwrap()["subscriptionId"]
786 .as_str()
787 .expect("subscription id")
788 .to_string();
789 let result = server
790 .call_stream_next(
791 json!({
792 "subscriptionId": subscription_id,
793 "waitMs": 1,
794 }),
795 "req-stream-next-timeout-1",
796 )
797 .await
798 .expect("stream.next should succeed");
799
800 assert_eq!(result.structured_content.unwrap()["timedOut"], true);
801 }
802
803 #[tokio::test]
804 async fn stream_next_returns_matching_events() {
805 let (server, _workspace) = test_server();
806 let subscribed = server
807 .call_stream_subscribe(
808 json!({ "topics": ["task.status.changed"] }),
809 "req-stream-subscribe-2",
810 "stream_subscribe",
811 )
812 .await
813 .expect("stream.subscribe should succeed");
814 let subscription_id = subscribed.structured_content.unwrap()["subscriptionId"]
815 .as_str()
816 .expect("subscription id")
817 .to_string();
818
819 server.runtime.stream_domain().publish(
820 "task.status.changed",
821 "task",
822 "task-123",
823 json!({ "from": "open", "to": "running" }),
824 );
825
826 let result = server
827 .call_stream_next(
828 json!({
829 "subscriptionId": subscription_id,
830 "waitMs": 25,
831 "maxEvents": 5,
832 }),
833 "req-stream-next-events-1",
834 )
835 .await
836 .expect("stream.next should succeed");
837
838 let payload = result.structured_content.expect("stream.next payload");
839 assert_eq!(payload["timedOut"], false);
840 assert_eq!(payload["events"].as_array().unwrap().len(), 1);
841 assert_eq!(payload["events"][0]["topic"], "task.status.changed");
842 }
843}