grex_mcp/lib.rs
1//! grex-mcp — MCP-native server for grex (M7).
2//!
3//! Stage 5 wires the [`rmcp`] framework: the server speaks the MCP
4//! 2025-06-18 wire protocol over any [`rmcp::transport`] (stdio in
5//! production, [`tokio::io::duplex`] in tests). The handshake +
6//! `tools/list` (returning empty) + transport-close shutdown are live;
7//! the 11 tool handlers land in Stage 6, and cancellation in Stage 7.
8//!
9//! # Stdout discipline
10//!
11//! The MCP stdio transport multiplexes **only JSON-RPC bytes** on
12//! `stdout`. All diagnostics MUST go to `stderr`. [`GrexMcpServer::run`]
13//! installs a `tracing_subscriber::fmt` writer pinned to `stderr` —
14//! idempotently, so test reuse and `serve`-from-CLI both work. The
15//! `no_println_lint.rs` integration test enforces zero `println!` /
16//! `print!` macros under `src/` to prevent regressions.
17
18#![deny(unsafe_code)]
19
20use std::{future::Future, sync::Arc};
21
22use grex_core::{Registry, Scheduler};
23use rmcp::{
24 model::{
25 CallToolRequestParams, CallToolResult, Implementation, ListToolsResult,
26 PaginatedRequestParams, ServerCapabilities, ServerInfo,
27 },
28 service::{MaybeSendFuture, RequestContext},
29 transport::IntoTransport,
30 ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
31};
32
33pub mod error;
34pub mod tools;
35
36pub use error::{CancelledExt, REQUEST_CANCELLED};
37
38/// Re-export the registered-tool name list so `serve` smoke tests +
39/// downstream crates have a stable handle on the surface.
40pub use tools::VERBS_EXPOSED;
41
42/// Shared, immutable-after-build state every tool handler reads.
43///
44/// Per-request cancellation is routed through rmcp's built-in `local_ct_pool`
45/// (`FromContextPart<CancellationToken>` injected on each `tools/call`); no
46/// custom in-flight token map lives on `ServerState`. Fields are `Arc`-wrapped
47/// so `ServerHandler::call_tool` can clone cheaply onto each spawn.
48#[derive(Clone)]
49pub struct ServerState {
50 /// Bounded permit pool the verbs use for `--parallel N` semantics.
51 pub scheduler: Arc<Scheduler>,
52 /// Plugin registry resolving manifest verbs to plugin impls.
53 pub registry: Arc<Registry>,
54 /// Path to the `.grex/events.jsonl` event log. Captured at server
55 /// launch and immutable for the session (per spec §"Manifest binding").
56 pub manifest_path: Arc<std::path::PathBuf>,
57 /// Workspace root the server resolves relative paths against.
58 pub workspace: Arc<std::path::PathBuf>,
59}
60
61impl ServerState {
62 /// Build a `ServerState` from already-constructed core components.
63 /// Stage 8 (`grex serve` CLI) will call this from `verbs/serve.rs`.
64 pub fn new(
65 scheduler: Scheduler,
66 registry: Registry,
67 manifest_path: std::path::PathBuf,
68 workspace: std::path::PathBuf,
69 ) -> Self {
70 Self {
71 scheduler: Arc::new(scheduler),
72 registry: Arc::new(registry),
73 manifest_path: Arc::new(manifest_path),
74 workspace: Arc::new(workspace),
75 }
76 }
77
78 /// Build a state suitable for in-process integration tests:
79 /// single-permit scheduler, empty registry, current-dir paths. Used
80 /// only by Stage 5 handshake / discipline tests where no tool body
81 /// actually runs.
82 pub fn for_tests() -> Self {
83 let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
84 let manifest = grex_core::manifest::event_log_path(&cwd);
85 Self::new(Scheduler::new(1), Registry::default(), manifest, cwd)
86 }
87}
88
89/// The grex MCP server. One instance per `grex serve` invocation; one
90/// instance per integration test. Cheap to construct; all state behind
91/// `Arc` so `ServerHandler` impls can clone onto handler tasks for free.
92#[derive(Clone)]
93pub struct GrexMcpServer {
94 pub(crate) state: ServerState,
95}
96
97impl GrexMcpServer {
98 pub fn new(state: ServerState) -> Self {
99 Self { state }
100 }
101
102 /// Drive the server against the given transport until it closes.
103 ///
104 /// Side effects:
105 /// 1. Installs a `tracing_subscriber::fmt` writer pinned to `stderr`
106 /// (idempotent — repeat calls in tests are tolerated).
107 /// 2. Hands ownership of `self` to rmcp's `ServiceExt::serve`, which
108 /// runs the JSON-RPC loop on the current Tokio runtime.
109 /// 3. Returns when the transport closes or an unrecoverable framing
110 /// error occurs.
111 ///
112 /// # Errors
113 /// Surfaces any `ServerInitializeError` from rmcp during the handshake.
114 pub async fn run<T, E, A>(
115 self,
116 transport: T,
117 ) -> Result<(), rmcp::service::ServerInitializeError>
118 where
119 T: IntoTransport<RoleServer, E, A>,
120 E: std::error::Error + Send + Sync + 'static,
121 {
122 init_stderr_tracing();
123 // Per-request cancellation is handled by rmcp's internal local_ct_pool
124 // (see service.rs:766 / :948 / :989-991) — surfaced to handlers via
125 // FromContextPart<CancellationToken>. We do NOT need serve_with_ct here;
126 // that's a server-shutdown surface, not per-request. Stage 5 wiring note
127 // #4 conflated the two — this comment supersedes that note for Stage 7.
128 let running = self.serve(transport).await?;
129 // Wait for transport close. `waiting()` returns once the service loop
130 // exits cleanly (drop of peer / EOF on transport).
131 let _quit_reason = running.waiting().await;
132 Ok(())
133 }
134}
135
136/// Pin `tracing` output to `stderr`, ensuring `stdout` carries only
137/// JSON-RPC bytes. Idempotent: `set_global_default` is allowed to fail
138/// with "already set" (test re-entry, daemon restart, embedded use).
139fn init_stderr_tracing() {
140 use tracing::subscriber::set_global_default;
141 use tracing_subscriber::{fmt, EnvFilter};
142
143 let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
144 let subscriber = fmt().with_writer(std::io::stderr).with_env_filter(filter).finish();
145
146 // Ignore "already set" — tests + repeat invocations both reach here.
147 let _ = set_global_default(subscriber);
148}
149
150impl ServerHandler for GrexMcpServer {
151 fn get_info(&self) -> ServerInfo {
152 let mut info = ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
153 .with_protocol_version(rmcp::model::ProtocolVersion::V_2025_06_18);
154 let mut implementation = Implementation::default();
155 implementation.name = "grex".into();
156 implementation.title = Some("grex MCP server".into());
157 implementation.version = env!("CARGO_PKG_VERSION").into();
158 info.server_info = implementation;
159 info.instructions = Some(
160 "grex pack-orchestrator MCP surface. 11 tools reachable via tools/call; \
161 cancellation via notifications/cancelled. See `.omne/cfg/mcp.md`."
162 .into(),
163 );
164 info
165 }
166
167 /// Stage 6: return all 11 tools assembled from the
168 /// `#[tool_router]`-generated `Self::tool_router()` aggregator.
169 /// The router is rebuilt per-call for now (cheap — just a few
170 /// hashmap inserts of `Arc`-cloned `Tool` values); Stage 7 may
171 /// memoize it onto `ServerState` if profiling shows it matters.
172 fn list_tools(
173 &self,
174 _request: Option<PaginatedRequestParams>,
175 _context: RequestContext<RoleServer>,
176 ) -> impl Future<Output = Result<ListToolsResult, McpError>> + MaybeSendFuture + '_ {
177 let tools = Self::tool_router().list_all();
178 std::future::ready(Ok(ListToolsResult { tools, next_cursor: None, meta: None }))
179 }
180
181 /// Stage 6: dispatch `tools/call` into the per-verb handler matching
182 /// `params.name` via the `#[tool_router]`-generated aggregator. Per-
183 /// tool argument deserialisation is handled by rmcp's `Parameters<P>`
184 /// extractor; bad params yield `-32602`. Unknown tool names also
185 /// yield `-32602` (rmcp's router default).
186 fn call_tool(
187 &self,
188 request: CallToolRequestParams,
189 context: RequestContext<RoleServer>,
190 ) -> impl Future<Output = Result<CallToolResult, McpError>> + MaybeSendFuture + '_ {
191 let tcc = rmcp::handler::server::tool::ToolCallContext::new(self, request, context);
192 async move { Self::tool_router().call(tcc).await }
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199
200 #[test]
201 fn server_state_for_tests_constructs() {
202 let s = ServerState::for_tests();
203 assert!(s.scheduler.max_parallelism() >= 1);
204 }
205
206 #[test]
207 fn server_constructs() {
208 let _ = GrexMcpServer::new(ServerState::for_tests());
209 }
210}