Skip to main content

grex_mcp/
lib.rs

1//! grex-mcp — MCP-native server for grex (M7).
2//!
3//! Stage 5 wires the [`rmcp`] framework: the server speaks the MCP
4//! 2025-06-18 wire protocol over any [`rmcp::transport`] (stdio in
5//! production, [`tokio::io::duplex`] in tests). The handshake +
6//! `tools/list` (returning empty) + transport-close shutdown are live;
7//! the 11 tool handlers land in Stage 6, and cancellation in Stage 7.
8//!
9//! # Stdout discipline
10//!
11//! The MCP stdio transport multiplexes **only JSON-RPC bytes** on
12//! `stdout`. All diagnostics MUST go to `stderr`. [`GrexMcpServer::run`]
13//! installs a `tracing_subscriber::fmt` writer pinned to `stderr` —
14//! idempotently, so test reuse and `serve`-from-CLI both work. The
15//! `no_println_lint.rs` integration test enforces zero `println!` /
16//! `print!` macros under `src/` to prevent regressions.
17
18#![deny(unsafe_code)]
19
20use std::{future::Future, sync::Arc};
21
22use grex_core::{Registry, Scheduler};
23use rmcp::{
24    model::{
25        CallToolRequestParams, CallToolResult, Implementation, ListToolsResult,
26        PaginatedRequestParams, ServerCapabilities, ServerInfo,
27    },
28    service::{MaybeSendFuture, RequestContext},
29    transport::IntoTransport,
30    ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
31};
32
33pub mod error;
34pub mod tools;
35
36pub use error::{CancelledExt, REQUEST_CANCELLED};
37
38/// Re-export the registered-tool name list so `serve` smoke tests +
39/// downstream crates have a stable handle on the surface.
40pub use tools::VERBS_EXPOSED;
41
42/// Shared, immutable-after-build state every tool handler reads.
43///
44/// Per-request cancellation is routed through rmcp's built-in `local_ct_pool`
45/// (`FromContextPart<CancellationToken>` injected on each `tools/call`); no
46/// custom in-flight token map lives on `ServerState`. Fields are `Arc`-wrapped
47/// so `ServerHandler::call_tool` can clone cheaply onto each spawn.
48#[derive(Clone)]
49pub struct ServerState {
50    /// Bounded permit pool the verbs use for `--parallel N` semantics.
51    pub scheduler: Arc<Scheduler>,
52    /// Plugin registry resolving manifest verbs to plugin impls.
53    pub registry: Arc<Registry>,
54    /// Path to the `.grex/events.jsonl` event log. Captured at server
55    /// launch and immutable for the session (per spec §"Manifest binding").
56    pub manifest_path: Arc<std::path::PathBuf>,
57    /// Workspace root the server resolves relative paths against.
58    pub workspace: Arc<std::path::PathBuf>,
59}
60
61impl ServerState {
62    /// Build a `ServerState` from already-constructed core components.
63    /// Stage 8 (`grex serve` CLI) will call this from `verbs/serve.rs`.
64    pub fn new(
65        scheduler: Scheduler,
66        registry: Registry,
67        manifest_path: std::path::PathBuf,
68        workspace: std::path::PathBuf,
69    ) -> Self {
70        Self {
71            scheduler: Arc::new(scheduler),
72            registry: Arc::new(registry),
73            manifest_path: Arc::new(manifest_path),
74            workspace: Arc::new(workspace),
75        }
76    }
77
78    /// Build a state suitable for in-process integration tests:
79    /// single-permit scheduler, empty registry, current-dir paths. Used
80    /// only by Stage 5 handshake / discipline tests where no tool body
81    /// actually runs.
82    pub fn for_tests() -> Self {
83        let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
84        let manifest = grex_core::manifest::event_log_path(&cwd);
85        Self::new(Scheduler::new(1), Registry::default(), manifest, cwd)
86    }
87}
88
89/// The grex MCP server. One instance per `grex serve` invocation; one
90/// instance per integration test. Cheap to construct; all state behind
91/// `Arc` so `ServerHandler` impls can clone onto handler tasks for free.
92#[derive(Clone)]
93pub struct GrexMcpServer {
94    pub(crate) state: ServerState,
95}
96
97impl GrexMcpServer {
98    pub fn new(state: ServerState) -> Self {
99        Self { state }
100    }
101
102    /// Drive the server against the given transport until it closes.
103    ///
104    /// Side effects:
105    ///   1. Installs a `tracing_subscriber::fmt` writer pinned to `stderr`
106    ///      (idempotent — repeat calls in tests are tolerated).
107    ///   2. Hands ownership of `self` to rmcp's `ServiceExt::serve`, which
108    ///      runs the JSON-RPC loop on the current Tokio runtime.
109    ///   3. Returns when the transport closes or an unrecoverable framing
110    ///      error occurs.
111    ///
112    /// # Errors
113    /// Surfaces any `ServerInitializeError` from rmcp during the handshake.
114    pub async fn run<T, E, A>(
115        self,
116        transport: T,
117    ) -> Result<(), rmcp::service::ServerInitializeError>
118    where
119        T: IntoTransport<RoleServer, E, A>,
120        E: std::error::Error + Send + Sync + 'static,
121    {
122        init_stderr_tracing();
123        // Per-request cancellation is handled by rmcp's internal local_ct_pool
124        // (see service.rs:766 / :948 / :989-991) — surfaced to handlers via
125        // FromContextPart<CancellationToken>. We do NOT need serve_with_ct here;
126        // that's a server-shutdown surface, not per-request. Stage 5 wiring note
127        // #4 conflated the two — this comment supersedes that note for Stage 7.
128        let running = self.serve(transport).await?;
129        // Wait for transport close. `waiting()` returns once the service loop
130        // exits cleanly (drop of peer / EOF on transport).
131        let _quit_reason = running.waiting().await;
132        Ok(())
133    }
134}
135
136/// Pin `tracing` output to `stderr`, ensuring `stdout` carries only
137/// JSON-RPC bytes. Idempotent: `set_global_default` is allowed to fail
138/// with "already set" (test re-entry, daemon restart, embedded use).
139fn init_stderr_tracing() {
140    use tracing::subscriber::set_global_default;
141    use tracing_subscriber::{fmt, EnvFilter};
142
143    let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
144    let subscriber = fmt().with_writer(std::io::stderr).with_env_filter(filter).finish();
145
146    // Ignore "already set" — tests + repeat invocations both reach here.
147    let _ = set_global_default(subscriber);
148}
149
150impl ServerHandler for GrexMcpServer {
151    fn get_info(&self) -> ServerInfo {
152        let mut info = ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
153            .with_protocol_version(rmcp::model::ProtocolVersion::V_2025_06_18);
154        let mut implementation = Implementation::default();
155        implementation.name = "grex".into();
156        implementation.title = Some("grex MCP server".into());
157        implementation.version = env!("CARGO_PKG_VERSION").into();
158        info.server_info = implementation;
159        info.instructions = Some(
160            "grex pack-orchestrator MCP surface. 11 tools reachable via tools/call; \
161             cancellation via notifications/cancelled. See `.omne/cfg/mcp.md`."
162                .into(),
163        );
164        info
165    }
166
167    /// Stage 6: return all 11 tools assembled from the
168    /// `#[tool_router]`-generated `Self::tool_router()` aggregator.
169    /// The router is rebuilt per-call for now (cheap — just a few
170    /// hashmap inserts of `Arc`-cloned `Tool` values); Stage 7 may
171    /// memoize it onto `ServerState` if profiling shows it matters.
172    fn list_tools(
173        &self,
174        _request: Option<PaginatedRequestParams>,
175        _context: RequestContext<RoleServer>,
176    ) -> impl Future<Output = Result<ListToolsResult, McpError>> + MaybeSendFuture + '_ {
177        let tools = Self::tool_router().list_all();
178        std::future::ready(Ok(ListToolsResult { tools, next_cursor: None, meta: None }))
179    }
180
181    /// Stage 6: dispatch `tools/call` into the per-verb handler matching
182    /// `params.name` via the `#[tool_router]`-generated aggregator. Per-
183    /// tool argument deserialisation is handled by rmcp's `Parameters<P>`
184    /// extractor; bad params yield `-32602`. Unknown tool names also
185    /// yield `-32602` (rmcp's router default).
186    fn call_tool(
187        &self,
188        request: CallToolRequestParams,
189        context: RequestContext<RoleServer>,
190    ) -> impl Future<Output = Result<CallToolResult, McpError>> + MaybeSendFuture + '_ {
191        let tcc = rmcp::handler::server::tool::ToolCallContext::new(self, request, context);
192        async move { Self::tool_router().call(tcc).await }
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199
200    #[test]
201    fn server_state_for_tests_constructs() {
202        let s = ServerState::for_tests();
203        assert!(s.scheduler.max_parallelism() >= 1);
204    }
205
206    #[test]
207    fn server_constructs() {
208        let _ = GrexMcpServer::new(ServerState::for_tests());
209    }
210}