#![deny(unsafe_code)]
use std::{future::Future, sync::Arc};
use grex_core::{Registry, Scheduler};
use rmcp::{
model::{
CallToolRequestParams, CallToolResult, Implementation, ListToolsResult,
PaginatedRequestParams, ServerCapabilities, ServerInfo,
},
service::{MaybeSendFuture, RequestContext},
transport::IntoTransport,
ErrorData as McpError, RoleServer, ServerHandler, ServiceExt,
};
pub mod error;
pub mod tools;
pub use error::{CancelledExt, REQUEST_CANCELLED};
pub use tools::VERBS_EXPOSED;
#[derive(Clone)]
pub struct ServerState {
pub scheduler: Arc<Scheduler>,
pub registry: Arc<Registry>,
pub manifest_path: Arc<std::path::PathBuf>,
pub workspace: Arc<std::path::PathBuf>,
}
impl ServerState {
pub fn new(
scheduler: Scheduler,
registry: Registry,
manifest_path: std::path::PathBuf,
workspace: std::path::PathBuf,
) -> Self {
Self {
scheduler: Arc::new(scheduler),
registry: Arc::new(registry),
manifest_path: Arc::new(manifest_path),
workspace: Arc::new(workspace),
}
}
pub fn for_tests() -> Self {
let cwd = std::env::current_dir().unwrap_or_else(|_| std::path::PathBuf::from("."));
let manifest = grex_core::manifest::event_log_path(&cwd);
Self::new(Scheduler::new(1), Registry::default(), manifest, cwd)
}
}
#[derive(Clone)]
pub struct GrexMcpServer {
pub(crate) state: ServerState,
}
impl GrexMcpServer {
pub fn new(state: ServerState) -> Self {
Self { state }
}
pub async fn run<T, E, A>(
self,
transport: T,
) -> Result<(), rmcp::service::ServerInitializeError>
where
T: IntoTransport<RoleServer, E, A>,
E: std::error::Error + Send + Sync + 'static,
{
init_stderr_tracing();
let running = self.serve(transport).await?;
let _quit_reason = running.waiting().await;
Ok(())
}
}
fn init_stderr_tracing() {
use tracing::subscriber::set_global_default;
use tracing_subscriber::{fmt, EnvFilter};
let filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let subscriber = fmt().with_writer(std::io::stderr).with_env_filter(filter).finish();
let _ = set_global_default(subscriber);
}
impl ServerHandler for GrexMcpServer {
fn get_info(&self) -> ServerInfo {
let mut info = ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
.with_protocol_version(rmcp::model::ProtocolVersion::V_2025_06_18);
let mut implementation = Implementation::default();
implementation.name = "grex".into();
implementation.title = Some("grex MCP server".into());
implementation.version = env!("CARGO_PKG_VERSION").into();
info.server_info = implementation;
info.instructions = Some(
"grex pack-orchestrator MCP surface. 11 tools reachable via tools/call; \
cancellation via notifications/cancelled. See `.omne/cfg/mcp.md`."
.into(),
);
info
}
fn list_tools(
&self,
_request: Option<PaginatedRequestParams>,
_context: RequestContext<RoleServer>,
) -> impl Future<Output = Result<ListToolsResult, McpError>> + MaybeSendFuture + '_ {
let tools = Self::tool_router().list_all();
std::future::ready(Ok(ListToolsResult { tools, next_cursor: None, meta: None }))
}
fn call_tool(
&self,
request: CallToolRequestParams,
context: RequestContext<RoleServer>,
) -> impl Future<Output = Result<CallToolResult, McpError>> + MaybeSendFuture + '_ {
let tcc = rmcp::handler::server::tool::ToolCallContext::new(self, request, context);
async move { Self::tool_router().call(tcc).await }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn server_state_for_tests_constructs() {
let s = ServerState::for_tests();
assert!(s.scheduler.max_parallelism() >= 1);
}
#[test]
fn server_constructs() {
let _ = GrexMcpServer::new(ServerState::for_tests());
}
}