use std::collections::HashMap;
use rmcp::{
handler::server::wrapper::Parameters,
model::{Implementation, ServerCapabilities, ServerInfo},
tool, tool_handler, tool_router, ErrorData as McpError, ServerHandler, ServiceExt,
};
use serde_json::{json, Value};
use khive_request::{parse_request, DslError, ParsedOp};
use khive_runtime::{KhiveRuntime, RuntimeError, VerbRegistry, VerbRegistryBuilder};
use crate::pack::{DialectRegistrar, KgDialect};
use crate::tools::request::RequestParams;
#[derive(Clone)]
pub struct KhiveMcpServer {
registry: VerbRegistry,
}
pub enum PackRegFailure {
UnknownPack(String),
Registry(khive_runtime::RuntimeError),
}
pub struct PackRegError {
pub failure: PackRegFailure,
pub runtime: KhiveRuntime,
}
impl std::fmt::Debug for PackRegError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut dbg = f.debug_struct("PackRegError");
match &self.failure {
PackRegFailure::UnknownPack(unknown) => dbg.field("unknown", unknown),
PackRegFailure::Registry(source) => dbg.field("source", source),
}
.finish_non_exhaustive()
}
}
impl std::fmt::Display for PackRegError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.failure {
PackRegFailure::UnknownPack(unknown) => write!(
f,
"unknown pack name {:?} — built-in packs: {}",
unknown,
BUILTIN_PACKS.join(", ")
),
PackRegFailure::Registry(source) => write!(f, "pack registry build failed: {source}"),
}
}
}
impl std::error::Error for PackRegError {}
pub const BUILTIN_PACKS: &[&str] = KgDialect::pack_names();
impl KhiveMcpServer {
pub fn new(runtime: KhiveRuntime) -> Self {
let packs: Vec<String> = runtime.config().packs.clone();
Self::with_packs(runtime, &packs).unwrap_or_else(|err| {
tracing::warn!("pack registration: {err}; falling back to kg only");
let recovered_runtime = err.runtime;
let gate = recovered_runtime.config().gate.clone();
let default_namespace = recovered_runtime.config().default_namespace.clone();
let mut builder = VerbRegistryBuilder::new();
builder.with_gate(gate);
builder.with_default_namespace(default_namespace);
if let Ok(event_store) = recovered_runtime.events(None) {
builder.with_event_store(event_store);
}
KgDialect::register("kg", recovered_runtime.clone(), &mut builder)
.expect("kg is a known pack name");
let registry = builder.build().expect("fallback kg registry builds");
recovered_runtime.install_edge_rules(registry.all_edge_rules());
Self { registry }
})
}
#[allow(clippy::result_large_err)]
pub fn with_packs(runtime: KhiveRuntime, packs: &[String]) -> Result<Self, PackRegError> {
let gate = runtime.config().gate.clone();
let default_namespace = runtime.config().default_namespace.clone();
let mut builder = VerbRegistryBuilder::new();
builder.with_gate(gate);
builder.with_default_namespace(default_namespace);
if let Ok(event_store) = runtime.events(None) {
builder.with_event_store(event_store);
}
let mut seen: std::collections::HashSet<&str> = std::collections::HashSet::new();
for name in packs {
if !seen.insert(name.as_str()) {
continue;
}
if let Err(unknown) = KgDialect::register(name, runtime.clone(), &mut builder) {
return Err(PackRegError {
failure: PackRegFailure::UnknownPack(unknown),
runtime,
});
}
}
let registry = builder.build().map_err(|source| PackRegError {
failure: PackRegFailure::Registry(source),
runtime: runtime.clone(),
})?;
runtime.install_edge_rules(registry.all_edge_rules());
Ok(Self { registry })
}
#[doc(hidden)]
pub fn from_registry(registry: VerbRegistry) -> Self {
Self { registry }
}
pub async fn serve_stdio(self) -> anyhow::Result<()> {
use rmcp::transport::stdio;
let service = self.serve(stdio()).await?;
service.waiting().await?;
Ok(())
}
fn verb_catalog(&self) -> String {
let mut by_verb: HashMap<&str, &str> = HashMap::new();
for v in self.registry.all_verbs() {
by_verb.entry(v.name).or_insert(v.description);
}
let mut entries: Vec<(&&str, &&str)> = by_verb.iter().collect();
entries.sort_by_key(|(n, _)| **n);
let mut out = String::new();
for (name, desc) in entries {
out.push_str(" ");
out.push_str(name);
out.push_str(" — ");
out.push_str(desc);
out.push('\n');
}
out
}
async fn run_parsed(&self, ops: Vec<ParsedOp>) -> Value {
let futures = ops.into_iter().map(|op| {
let registry = self.registry.clone();
async move {
let ParsedOp { tool, args } = op;
let args_value = Value::Object(args);
match registry.dispatch(&tool, args_value).await {
Ok(result) => json!({ "ok": true, "tool": tool, "result": result }),
Err(RuntimeError::Khive(k)) => {
let error_payload = serde_json::to_value(&k).unwrap_or_else(
|_| json!({ "kind": "internal", "message": k.to_string() }),
);
json!({ "ok": false, "tool": tool, "error": error_payload })
}
Err(e) => json!({ "ok": false, "tool": tool, "error": e.to_string() }),
}
}
});
let results: Vec<Value> = futures::future::join_all(futures).await;
let total = results.len();
let succeeded = results
.iter()
.filter(|r| r.get("ok").and_then(Value::as_bool) == Some(true))
.count();
let failed = total - succeeded;
json!({
"results": results,
"summary": { "total": total, "succeeded": succeeded, "failed": failed },
})
}
}
#[tool_router]
impl KhiveMcpServer {
#[tool(description = r#"Run one or more khive verbs in a single MCP call.
ops syntax (ADR-020):
Single op : verb(name=value, name=value)
Batch : [verb(...), verb(...)] — parallel, max 100
JSON form : [{"tool":"verb","args":{...}}, ...] — equivalent
Argument values are JSON literals: strings (double-quoted), numbers, booleans,
null, arrays, objects. Strings may contain commas / parens; escape with \".
Response shape:
{
"results": [ {"ok": true, "tool": "verb", "result": {...}}, ... ],
"summary": { "total": N, "succeeded": N, "failed": N }
}
A failed op does NOT abort the batch. Each entry has its own ok / error.
Verb discovery: install the `kg` / `gtd` plugins for usage skills. The verbs
currently registered on this server (pack-derived) are listed below. Argument
schemas live in each pack's docs and SKILL.md files.
Tip: for one-shot calls, the single-op form is the densest. Use batch when
several independent ops can run together (e.g. bulk create + link)."#)]
async fn request(&self, Parameters(p): Parameters<RequestParams>) -> Result<String, McpError> {
let parsed = parse_request(&p.ops).map_err(dsl_err_to_mcp)?;
let result = self.run_parsed(parsed.ops).await;
serde_json::to_string_pretty(&result)
.map_err(|e| McpError::internal_error(format!("serialize: {e}"), None))
}
}
fn dsl_err_to_mcp(e: DslError) -> McpError {
McpError::invalid_params(e.to_string(), None)
}
#[tool_handler]
impl ServerHandler for KhiveMcpServer {
fn get_info(&self) -> ServerInfo {
let catalog = self.verb_catalog();
let instructions = format!(
"khive — request-only MCP surface (ADR-020 + ADR-027). One tool, `request`, \
dispatches verbs through the loaded pack registry. Configure packs via \
KHIVE_PACKS or --pack (built-ins: kg, gtd). Verbs registered on this \
server:\n{catalog}\nFor detailed usage of each verb, see the corresponding \
plugin's SKILL.md files."
);
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
.with_server_info(Implementation::new(
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_VERSION"),
))
.with_instructions(instructions)
}
async fn list_tools(
&self,
_request: Option<rmcp::model::PaginatedRequestParams>,
_context: rmcp::service::RequestContext<rmcp::RoleServer>,
) -> Result<rmcp::model::ListToolsResult, McpError> {
let mut tools = Self::tool_router().list_all();
let catalog = self.verb_catalog();
for t in &mut tools {
if t.name == "request" {
let base = t.description.as_deref().unwrap_or("");
t.description = Some(std::borrow::Cow::Owned(format!(
"{base}\n\nVerbs registered on this server:\n{catalog}"
)));
}
}
Ok(rmcp::model::ListToolsResult {
tools,
meta: None,
next_cursor: None,
})
}
}