Skip to main content

ralph_api/
mcp.rs

1use std::borrow::Cow;
2use std::collections::HashMap;
3use std::sync::{Arc, OnceLock};
4use std::time::Duration;
5
6use anyhow::{Context, Result, anyhow};
7use jsonschema::{Draft, JSONSchema};
8use rmcp::model::{
9    CallToolRequestParams, CallToolResult, ErrorCode, ErrorData, Implementation, InitializeResult,
10    ListToolsResult, PaginatedRequestParams, ServerCapabilities, Tool, ToolAnnotations,
11};
12use rmcp::service::{RequestContext, RoleServer};
13use rmcp::{ServerHandler, ServiceExt};
14use serde::Deserialize;
15use serde_json::{Map, Value, json};
16use tokio::sync::{Mutex as AsyncMutex, broadcast};
17
18use crate::config::ApiConfig;
19use crate::errors::ApiError;
20use crate::protocol::{KNOWN_METHODS, MUTATING_METHODS};
21use crate::runtime::RpcRuntime;
22use crate::stream_domain::StreamEventEnvelope;
23
24const MCP_PRINCIPAL: &str = "trusted_local";
25const TOOL_PAGE_SIZE: usize = 128;
26const DEFAULT_STREAM_NEXT_WAIT_MS: u64 = 30_000;
27const MAX_STREAM_NEXT_WAIT_MS: u64 = 120_000;
28const DEFAULT_STREAM_NEXT_EVENTS: u16 = 50;
29const MAX_STREAM_NEXT_EVENTS: u16 = 200;
30
31#[derive(Clone)]
32pub struct RalphMcpServer {
33    runtime: RpcRuntime,
34    catalog: Arc<ToolCatalog>,
35    subscriptions: Arc<AsyncMutex<HashMap<String, Arc<AsyncMutex<McpSubscriptionState>>>>>,
36}
37
38impl RalphMcpServer {
39    pub fn new(config: ApiConfig) -> Result<Self> {
40        let runtime = RpcRuntime::new(config)?;
41        let catalog = Arc::new(tool_catalog().clone());
42        Ok(Self {
43            runtime,
44            catalog,
45            subscriptions: Arc::new(AsyncMutex::new(HashMap::new())),
46        })
47    }
48
49    fn invoke_rpc_tool(
50        &self,
51        method: &str,
52        arguments: Value,
53        request_id: &str,
54        tool_name: &str,
55    ) -> Result<Value, ApiError> {
56        let idempotency_key = MUTATING_METHODS
57            .contains(&method)
58            .then(|| format!("mcp:{tool_name}:{request_id}"));
59        self.runtime.invoke_method(
60            request_id.to_string(),
61            method,
62            arguments,
63            MCP_PRINCIPAL,
64            idempotency_key,
65        )
66    }
67
68    fn call_rpc_tool(
69        &self,
70        method: &str,
71        arguments: Value,
72        request_id: &str,
73        tool_name: &str,
74    ) -> Result<CallToolResult, ApiError> {
75        let result = self.invoke_rpc_tool(method, arguments, request_id, tool_name)?;
76        Ok(CallToolResult::structured(result))
77    }
78
79    async fn call_stream_subscribe(
80        &self,
81        arguments: Value,
82        request_id: &str,
83        tool_name: &str,
84    ) -> Result<CallToolResult, ApiError> {
85        let live_rx = self.runtime.stream_domain().live_receiver();
86        let result = self.invoke_rpc_tool("stream.subscribe", arguments, request_id, tool_name)?;
87        let subscription_id = subscription_id_from_result(&result)
88            .ok_or_else(|| ApiError::internal("stream.subscribe result missing subscriptionId"))
89            .map_err(|error| {
90                error.with_context(request_id.to_string(), Some("stream.subscribe".to_string()))
91            })?;
92        self.subscriptions.lock().await.insert(
93            subscription_id.to_string(),
94            Arc::new(AsyncMutex::new(McpSubscriptionState { live_rx })),
95        );
96        Ok(CallToolResult::structured(result))
97    }
98
99    async fn call_stream_unsubscribe(
100        &self,
101        arguments: Value,
102        request_id: &str,
103        tool_name: &str,
104    ) -> Result<CallToolResult, ApiError> {
105        let subscription_id = subscription_id_from_arguments(&arguments)
106            .ok_or_else(|| ApiError::internal("stream.unsubscribe args missing subscriptionId"))
107            .map_err(|error| {
108                error.with_context(
109                    request_id.to_string(),
110                    Some("stream.unsubscribe".to_string()),
111                )
112            })?
113            .to_string();
114        let result =
115            self.invoke_rpc_tool("stream.unsubscribe", arguments, request_id, tool_name)?;
116        self.subscriptions.lock().await.remove(&subscription_id);
117        Ok(CallToolResult::structured(result))
118    }
119
120    async fn call_stream_next(
121        &self,
122        arguments: Value,
123        request_id: &str,
124    ) -> Result<CallToolResult, ApiError> {
125        let params: StreamNextParams = serde_json::from_value(arguments).map_err(|error| {
126            ApiError::invalid_params(format!("invalid params for method 'stream.next': {error}"))
127                .with_context(request_id.to_string(), Some("stream.next".to_string()))
128        })?;
129        let wait_ms = params
130            .wait_ms
131            .unwrap_or(DEFAULT_STREAM_NEXT_WAIT_MS)
132            .clamp(1, MAX_STREAM_NEXT_WAIT_MS);
133        let max_events = params
134            .max_events
135            .unwrap_or(DEFAULT_STREAM_NEXT_EVENTS)
136            .clamp(1, MAX_STREAM_NEXT_EVENTS) as usize;
137
138        let streams = self.runtime.stream_domain();
139        if !streams.has_subscription(&params.subscription_id) {
140            return Err(ApiError::not_found(format!(
141                "subscription '{}' not found",
142                params.subscription_id
143            ))
144            .with_context(request_id.to_string(), Some("stream.next".to_string()))
145            .with_details(json!({ "subscriptionId": params.subscription_id })));
146        }
147
148        let subscription = self.lookup_subscription(&params.subscription_id).await?;
149        let replay = streams
150            .replay_for_subscription(&params.subscription_id)
151            .map_err(|error| {
152                error.with_context(request_id.to_string(), Some("stream.next".to_string()))
153            })?;
154
155        let mut events = replay.events;
156        let mut dropped_count = replay.dropped_count;
157        if !events.is_empty() || dropped_count > 0 {
158            events.truncate(max_events);
159            return Ok(stream_next_result(events, false, dropped_count));
160        }
161
162        let mut live_state = subscription.lock().await;
163        let current_cursor = streams
164            .subscription_cursor(&params.subscription_id)
165            .map_err(|error| {
166                error.with_context(request_id.to_string(), Some("stream.next".to_string()))
167            })?;
168        let current_cursor_sequence = streams
169            .subscription_cursor_sequence(&params.subscription_id)
170            .map_err(|error| {
171                error.with_context(request_id.to_string(), Some("stream.next".to_string()))
172            })?;
173
174        if let Some(result) = collect_live_events(
175            &streams,
176            &params.subscription_id,
177            &current_cursor,
178            current_cursor_sequence,
179            &mut live_state.live_rx,
180            max_events,
181            dropped_count,
182        ) {
183            return Ok(stream_next_result(
184                result.events,
185                false,
186                result.dropped_count,
187            ));
188        }
189
190        let timer = tokio::time::sleep(Duration::from_millis(wait_ms));
191        tokio::pin!(timer);
192
193        loop {
194            tokio::select! {
195                _ = &mut timer => {
196                    return Ok(stream_next_result(Vec::new(), true, dropped_count));
197                }
198                message = live_state.live_rx.recv() => {
199                    match message {
200                        Ok(event) => {
201                            if is_pending_live_event(
202                                &streams,
203                                &params.subscription_id,
204                                &current_cursor,
205                                current_cursor_sequence,
206                                &event,
207                            ) {
208                                events.push(event);
209                                let drained = collect_live_events(
210                                    &streams,
211                                    &params.subscription_id,
212                                    &current_cursor,
213                                    current_cursor_sequence,
214                                    &mut live_state.live_rx,
215                                    max_events - events.len(),
216                                    dropped_count,
217                                );
218                                if let Some(drained) = drained {
219                                    dropped_count = drained.dropped_count;
220                                    events.extend(drained.events);
221                                }
222                                return Ok(stream_next_result(events, false, dropped_count));
223                            }
224                        }
225                        Err(broadcast::error::RecvError::Lagged(skipped)) => {
226                            dropped_count = dropped_count.saturating_add(skipped as usize);
227                            return Ok(stream_next_result(Vec::new(), false, dropped_count));
228                        }
229                        Err(broadcast::error::RecvError::Closed) => {
230                            return Err(ApiError::service_unavailable("stream receiver closed")
231                                .with_context(request_id.to_string(), Some("stream.next".to_string())));
232                        }
233                    }
234                }
235            }
236        }
237    }
238
239    async fn lookup_subscription(
240        &self,
241        subscription_id: &str,
242    ) -> Result<Arc<AsyncMutex<McpSubscriptionState>>, ApiError> {
243        self.subscriptions
244            .lock()
245            .await
246            .get(subscription_id)
247            .cloned()
248            .ok_or_else(|| {
249                ApiError::not_found(format!(
250                    "subscription '{}' is not managed by this MCP server",
251                    subscription_id
252                ))
253                .with_details(json!({ "subscriptionId": subscription_id }))
254            })
255    }
256}
257
258impl ServerHandler for RalphMcpServer {
259    fn get_info(&self) -> InitializeResult {
260        InitializeResult::new(ServerCapabilities::builder().enable_tools().build())
261            .with_server_info(
262                Implementation::new("ralph-mcp", env!("CARGO_PKG_VERSION"))
263                    .with_title("Ralph MCP Server"),
264            )
265            .with_instructions(
266                "Use the Ralph control-plane tools to inspect and manage local orchestration state.",
267            )
268    }
269
270    fn list_tools(
271        &self,
272        request: Option<PaginatedRequestParams>,
273        _context: RequestContext<RoleServer>,
274    ) -> impl Future<Output = Result<ListToolsResult, ErrorData>> + Send + '_ {
275        std::future::ready(
276            self.catalog
277                .list_tools(request.as_ref().and_then(|value| value.cursor.as_deref())),
278        )
279    }
280
281    fn get_tool(&self, name: &str) -> Option<Tool> {
282        self.catalog.lookup(name).map(|entry| entry.tool.clone())
283    }
284
285    async fn call_tool(
286        &self,
287        request: CallToolRequestParams,
288        context: RequestContext<RoleServer>,
289    ) -> Result<CallToolResult, ErrorData> {
290        let request_id = context.id.to_string();
291        let Some(entry) = self.catalog.lookup(request.name.as_ref()) else {
292            return Err(ErrorData::new(
293                ErrorCode::METHOD_NOT_FOUND,
294                format!("tool '{}' is not available", request.name),
295                None,
296            ));
297        };
298
299        let arguments = Value::Object(request.arguments.unwrap_or_default());
300        if let Err(errors) = entry.validate_input(&arguments) {
301            let error = ApiError::invalid_params(format!(
302                "tool '{}' arguments do not match the published schema",
303                request.name
304            ))
305            .with_context(request_id.clone(), Some(entry.method_name().to_string()))
306            .with_details(json!({ "errors": errors }));
307            return Ok(tool_error_result(error));
308        }
309
310        let result = match &entry.target {
311            ToolTarget::Rpc { method } => match method.as_str() {
312                "stream.subscribe" => {
313                    self.call_stream_subscribe(arguments, &request_id, entry.tool.name.as_ref())
314                        .await
315                }
316                "stream.unsubscribe" => {
317                    self.call_stream_unsubscribe(arguments, &request_id, entry.tool.name.as_ref())
318                        .await
319                }
320                _ => self.call_rpc_tool(method, arguments, &request_id, entry.tool.name.as_ref()),
321            },
322            ToolTarget::StreamNext => self.call_stream_next(arguments, &request_id).await,
323        };
324
325        match result {
326            Ok(result) => Ok(result),
327            Err(error) => Ok(tool_error_result(error)),
328        }
329    }
330}
331
332pub async fn serve_stdio(config: ApiConfig) -> Result<()> {
333    let server = RalphMcpServer::new(config)?;
334    let running = server.serve(rmcp::transport::stdio()).await?;
335    let _ = running.waiting().await?;
336    Ok(())
337}
338
339#[derive(Clone)]
340struct ToolCatalog {
341    entries: Vec<ToolEntry>,
342    by_name: HashMap<String, usize>,
343}
344
345impl ToolCatalog {
346    fn load() -> Result<Self> {
347        let root_schema: Value = serde_json::from_str(include_str!("../data/rpc-v1-schema.json"))
348            .context("embedded rpc-v1 schema must be valid JSON")?;
349        let request_variants = schema_method_map(&root_schema, "requestByMethod", "params")?;
350        let response_variants = schema_method_map(&root_schema, "responseByMethod", "result")?;
351
352        let mut entries = Vec::new();
353        for method in KNOWN_METHODS {
354            let name = rpc_method_to_tool_name(method);
355            let input_ref = request_variants
356                .get(*method)
357                .with_context(|| format!("missing params schema for method '{method}'"))?;
358            let output_ref = response_variants
359                .get(*method)
360                .with_context(|| format!("missing result schema for method '{method}'"))?;
361
362            let input_schema = schema_ref_object(&root_schema, input_ref)?;
363            let output_schema = schema_ref_object(&root_schema, output_ref)?;
364            let tool = build_tool(name, method, &input_schema, Some(&output_schema));
365            let input_validator = compile_validator(&input_schema)?;
366
367            entries.push(ToolEntry {
368                tool,
369                target: ToolTarget::Rpc {
370                    method: (*method).to_string(),
371                },
372                input_validator,
373            });
374        }
375
376        let stream_next_input = stream_next_input_schema();
377        entries.push(ToolEntry {
378            tool: build_tool(
379                "stream_next".to_string(),
380                "stream.next",
381                &stream_next_input,
382                Some(&stream_next_output_schema()),
383            ),
384            target: ToolTarget::StreamNext,
385            input_validator: compile_validator(&stream_next_input)?,
386        });
387
388        entries.sort_by(|left, right| left.tool.name.cmp(&right.tool.name));
389        let by_name = entries
390            .iter()
391            .enumerate()
392            .map(|(index, entry)| (entry.tool.name.to_string(), index))
393            .collect();
394
395        Ok(Self { entries, by_name })
396    }
397
398    fn lookup(&self, name: &str) -> Option<&ToolEntry> {
399        self.by_name
400            .get(name)
401            .and_then(|index| self.entries.get(*index))
402    }
403
404    fn list_tools(&self, cursor: Option<&str>) -> Result<ListToolsResult, ErrorData> {
405        let offset = cursor
406            .map(|value| {
407                value.parse::<usize>().map_err(|_| {
408                    ErrorData::invalid_params(format!("invalid tools/list cursor '{value}'"), None)
409                })
410            })
411            .transpose()?
412            .unwrap_or(0);
413
414        if offset > self.entries.len() {
415            return Err(ErrorData::invalid_params(
416                format!("tools/list cursor '{}' is out of range", offset),
417                None,
418            ));
419        }
420
421        let end = (offset + TOOL_PAGE_SIZE).min(self.entries.len());
422        let mut result = ListToolsResult::with_all_items(
423            self.entries[offset..end]
424                .iter()
425                .map(|entry| entry.tool.clone())
426                .collect(),
427        );
428        if end < self.entries.len() {
429            result.next_cursor = Some(end.to_string());
430        }
431        Ok(result)
432    }
433}
434
435#[derive(Clone)]
436struct ToolEntry {
437    tool: Tool,
438    target: ToolTarget,
439    input_validator: Arc<JSONSchema>,
440}
441
442impl ToolEntry {
443    fn method_name(&self) -> &str {
444        match &self.target {
445            ToolTarget::Rpc { method } => method,
446            ToolTarget::StreamNext => "stream.next",
447        }
448    }
449
450    fn validate_input(&self, arguments: &Value) -> Result<(), Vec<String>> {
451        match self.input_validator.validate(arguments) {
452            Ok(()) => Ok(()),
453            Err(errors) => Err(errors.map(|error| error.to_string()).collect()),
454        }
455    }
456}
457
458#[derive(Clone)]
459enum ToolTarget {
460    Rpc { method: String },
461    StreamNext,
462}
463
464#[derive(Debug, Deserialize)]
465#[serde(rename_all = "camelCase")]
466struct StreamNextParams {
467    subscription_id: String,
468    wait_ms: Option<u64>,
469    max_events: Option<u16>,
470}
471
472struct McpSubscriptionState {
473    live_rx: broadcast::Receiver<StreamEventEnvelope>,
474}
475
476struct LiveEventBatch {
477    events: Vec<StreamEventEnvelope>,
478    dropped_count: usize,
479}
480
481fn subscription_id_from_result(result: &Value) -> Option<&str> {
482    result.get("subscriptionId").and_then(Value::as_str)
483}
484
485fn subscription_id_from_arguments(arguments: &Value) -> Option<&str> {
486    arguments.get("subscriptionId").and_then(Value::as_str)
487}
488
489fn is_pending_live_event(
490    streams: &crate::stream_domain::StreamDomain,
491    subscription_id: &str,
492    current_cursor: &str,
493    cursor_sequence: u64,
494    event: &StreamEventEnvelope,
495) -> bool {
496    streams.matches_subscription(subscription_id, event)
497        && (event.sequence > cursor_sequence
498            || (event.sequence == cursor_sequence && event.cursor != current_cursor))
499}
500
501fn collect_live_events(
502    streams: &crate::stream_domain::StreamDomain,
503    subscription_id: &str,
504    current_cursor: &str,
505    cursor_sequence: u64,
506    live_rx: &mut broadcast::Receiver<StreamEventEnvelope>,
507    max_events: usize,
508    dropped_count: usize,
509) -> Option<LiveEventBatch> {
510    if max_events == 0 {
511        return Some(LiveEventBatch {
512            events: Vec::new(),
513            dropped_count,
514        });
515    }
516
517    let mut events = Vec::new();
518    let mut dropped_count = dropped_count;
519    loop {
520        match live_rx.try_recv() {
521            Ok(event) => {
522                if is_pending_live_event(
523                    streams,
524                    subscription_id,
525                    current_cursor,
526                    cursor_sequence,
527                    &event,
528                ) {
529                    events.push(event);
530                    if events.len() >= max_events {
531                        break;
532                    }
533                }
534            }
535            Err(broadcast::error::TryRecvError::Lagged(skipped)) => {
536                dropped_count = dropped_count.saturating_add(skipped as usize);
537                break;
538            }
539            Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
540                break;
541            }
542        }
543    }
544
545    (!events.is_empty() || dropped_count > 0).then_some(LiveEventBatch {
546        events,
547        dropped_count,
548    })
549}
550
551fn tool_catalog() -> &'static ToolCatalog {
552    static CATALOG: OnceLock<ToolCatalog> = OnceLock::new();
553    CATALOG.get_or_init(|| {
554        ToolCatalog::load().expect("embedded rpc-v1 schema must be convertible into an MCP catalog")
555    })
556}
557
558fn rpc_method_to_tool_name(method: &str) -> String {
559    method.replace('.', "_")
560}
561
562fn build_tool(
563    name: String,
564    method: &str,
565    input_schema: &Map<String, Value>,
566    output_schema: Option<&Map<String, Value>>,
567) -> Tool {
568    let description: Cow<'static, str> = match method {
569        "system.health" => "Return the Ralph control-plane health snapshot.".into(),
570        "system.version" => "Return the Ralph control-plane API and server version.".into(),
571        "system.capabilities" => {
572            "List supported Ralph control-plane methods and stream topics.".into()
573        }
574        "task.list" => "List Ralph tasks, with optional filters.".into(),
575        "task.ready" => "List open tasks that are ready to run.".into(),
576        "task.run_all" => "Enqueue every open or queued Ralph task.".into(),
577        "loop.status" => "Return the current primary loop and merge status.".into(),
578        "loop.trigger_merge_task" => "Create a merge task for a completed loop.".into(),
579        "planning.get_artifact" => "Read a generated planning artifact by filename.".into(),
580        "config.get" => "Read the current Ralph YAML configuration.".into(),
581        "config.update" => "Replace the Ralph YAML configuration after validation.".into(),
582        "preset.list" => "List all available Ralph presets.".into(),
583        "collection.import" => "Import a preset collection from YAML.".into(),
584        "collection.export" => "Export a preset collection to YAML.".into(),
585        "stream.subscribe" => "Create a Ralph event stream subscription.".into(),
586        "stream.unsubscribe" => "Close a Ralph event stream subscription.".into(),
587        "stream.ack" => "Advance a Ralph event stream subscription cursor.".into(),
588        "stream.next" => "Poll for the next batch of Ralph stream events.".into(),
589        _ => format!("Invoke Ralph control-plane method `{method}`.").into(),
590    };
591
592    let mut tool = Tool::new_with_raw(name, Some(description), Arc::new(input_schema.clone()));
593    tool.title = Some(method.to_string());
594    tool.output_schema = output_schema.cloned().map(Arc::new);
595    tool.annotations = Some(
596        ToolAnnotations::with_title(method)
597            .read_only(!MUTATING_METHODS.contains(&method))
598            .destructive(MUTATING_METHODS.contains(&method))
599            .open_world(false),
600    );
601    tool
602}
603
604fn schema_method_map(
605    root_schema: &Value,
606    def_name: &str,
607    property_name: &str,
608) -> Result<HashMap<String, String>> {
609    let variants = root_schema
610        .pointer(&format!("/$defs/{def_name}/oneOf"))
611        .and_then(Value::as_array)
612        .with_context(|| format!("schema def '{def_name}' must expose a oneOf array"))?;
613
614    let mut map = HashMap::new();
615    for variant in variants {
616        let method = variant
617            .pointer("/properties/method/const")
618            .and_then(Value::as_str)
619            .context("schema variant is missing properties.method.const")?;
620        let schema_ref = variant
621            .pointer(&format!("/properties/{property_name}/$ref"))
622            .and_then(Value::as_str)
623            .with_context(|| {
624                format!("schema variant '{method}' is missing {property_name}.$ref")
625            })?;
626        map.insert(method.to_string(), schema_ref.to_string());
627    }
628
629    Ok(map)
630}
631
632fn schema_ref_object(root_schema: &Value, schema_ref: &str) -> Result<Map<String, Value>> {
633    let defs = root_schema
634        .get("$defs")
635        .cloned()
636        .context("schema must expose $defs")?;
637    let schema = json!({
638        "$schema": root_schema
639            .get("$schema")
640            .cloned()
641            .unwrap_or_else(|| json!("https://json-schema.org/draft/2020-12/schema")),
642        "$defs": defs,
643        "$ref": schema_ref,
644    });
645    schema
646        .as_object()
647        .cloned()
648        .context("generated schema object must be a JSON object")
649}
650
651fn compile_validator(schema: &Map<String, Value>) -> Result<Arc<JSONSchema>> {
652    let schema_value = Value::Object(schema.clone());
653    let validator = JSONSchema::options()
654        .with_draft(Draft::Draft202012)
655        .compile(&schema_value)
656        .map_err(|error| anyhow!("tool schema must compile: {error}"))?;
657    Ok(Arc::new(validator))
658}
659
660fn stream_next_input_schema() -> Map<String, Value> {
661    json!({
662        "$schema": "https://json-schema.org/draft/2020-12/schema",
663        "type": "object",
664        "additionalProperties": false,
665        "properties": {
666            "subscriptionId": {
667                "type": "string",
668                "minLength": 1
669            },
670            "waitMs": {
671                "type": "integer",
672                "minimum": 1,
673                "maximum": MAX_STREAM_NEXT_WAIT_MS
674            },
675            "maxEvents": {
676                "type": "integer",
677                "minimum": 1,
678                "maximum": MAX_STREAM_NEXT_EVENTS
679            }
680        },
681        "required": ["subscriptionId"]
682    })
683    .as_object()
684    .cloned()
685    .expect("stream.next input schema must be an object")
686}
687
688fn stream_next_output_schema() -> Map<String, Value> {
689    let event_schema: Value = serde_json::from_str(include_str!("../data/rpc-v1-events.json"))
690        .expect("embedded rpc-v1 event schema must be valid JSON");
691    let defs = event_schema
692        .get("$defs")
693        .cloned()
694        .expect("embedded rpc-v1 event schema must expose $defs");
695
696    json!({
697        "$schema": "https://json-schema.org/draft/2020-12/schema",
698        "$defs": defs,
699        "type": "object",
700        "additionalProperties": false,
701        "properties": {
702            "events": {
703                "type": "array",
704                "items": { "$ref": "#/$defs/eventEnvelope" }
705            },
706            "timedOut": { "type": "boolean" },
707            "droppedCount": { "type": "integer", "minimum": 0 }
708        },
709        "required": ["events", "timedOut", "droppedCount"]
710    })
711    .as_object()
712    .cloned()
713    .expect("stream.next output schema must be an object")
714}
715
716fn stream_next_result(
717    events: Vec<StreamEventEnvelope>,
718    timed_out: bool,
719    dropped_count: usize,
720) -> CallToolResult {
721    CallToolResult::structured(json!({
722        "events": events,
723        "timedOut": timed_out,
724        "droppedCount": dropped_count,
725    }))
726}
727
728fn tool_error_result(error: ApiError) -> CallToolResult {
729    CallToolResult::structured_error(json!({
730        "code": error.code.as_str(),
731        "message": error.message,
732        "details": error.details,
733        "requestId": error.request_id,
734        "method": error.method,
735        "retryable": error.retryable,
736    }))
737}
738
739#[cfg(test)]
740mod tests {
741    use tempfile::TempDir;
742
743    use super::*;
744
745    fn test_server() -> (RalphMcpServer, TempDir) {
746        let workspace = tempfile::tempdir().expect("workspace tempdir should be created");
747        let config = ApiConfig {
748            workspace_root: workspace.path().to_path_buf(),
749            ..ApiConfig::default()
750        };
751        (
752            RalphMcpServer::new(config).expect("MCP server should initialize"),
753            workspace,
754        )
755    }
756
757    #[test]
758    fn catalog_maps_known_tools() {
759        let catalog = tool_catalog();
760        assert!(catalog.lookup("task_list").is_some());
761        assert!(catalog.lookup("loop_trigger_merge_task").is_some());
762        assert!(catalog.lookup("stream_next").is_some());
763    }
764
765    #[test]
766    fn catalog_publishes_output_schemas() {
767        let catalog = tool_catalog();
768        let task_create = catalog.lookup("task_create").expect("task_create tool");
769        assert!(task_create.tool.output_schema.is_some());
770        let stream_next = catalog.lookup("stream_next").expect("stream_next tool");
771        assert!(stream_next.tool.output_schema.is_some());
772    }
773
774    #[tokio::test]
775    async fn stream_next_times_out_without_events() {
776        let (server, _workspace) = test_server();
777        let subscribed = server
778            .call_stream_subscribe(
779                json!({ "topics": ["task.status.changed"] }),
780                "req-stream-subscribe-1",
781                "stream_subscribe",
782            )
783            .await
784            .expect("stream.subscribe should succeed");
785        let subscription_id = subscribed.structured_content.unwrap()["subscriptionId"]
786            .as_str()
787            .expect("subscription id")
788            .to_string();
789        let result = server
790            .call_stream_next(
791                json!({
792                    "subscriptionId": subscription_id,
793                    "waitMs": 1,
794                }),
795                "req-stream-next-timeout-1",
796            )
797            .await
798            .expect("stream.next should succeed");
799
800        assert_eq!(result.structured_content.unwrap()["timedOut"], true);
801    }
802
803    #[tokio::test]
804    async fn stream_next_returns_matching_events() {
805        let (server, _workspace) = test_server();
806        let subscribed = server
807            .call_stream_subscribe(
808                json!({ "topics": ["task.status.changed"] }),
809                "req-stream-subscribe-2",
810                "stream_subscribe",
811            )
812            .await
813            .expect("stream.subscribe should succeed");
814        let subscription_id = subscribed.structured_content.unwrap()["subscriptionId"]
815            .as_str()
816            .expect("subscription id")
817            .to_string();
818
819        server.runtime.stream_domain().publish(
820            "task.status.changed",
821            "task",
822            "task-123",
823            json!({ "from": "open", "to": "running" }),
824        );
825
826        let result = server
827            .call_stream_next(
828                json!({
829                    "subscriptionId": subscription_id,
830                    "waitMs": 25,
831                    "maxEvents": 5,
832                }),
833                "req-stream-next-events-1",
834            )
835            .await
836            .expect("stream.next should succeed");
837
838        let payload = result.structured_content.expect("stream.next payload");
839        assert_eq!(payload["timedOut"], false);
840        assert_eq!(payload["events"].as_array().unwrap().len(), 1);
841        assert_eq!(payload["events"][0]["topic"], "task.status.changed");
842    }
843}