use std::sync::Arc;
use rmcp::handler::server::wrapper::{Json, Parameters};
use rmcp::model::{Implementation, InitializeResult, ProtocolVersion, ServerCapabilities};
use rmcp::transport::stdio;
use rmcp::{ErrorData as McpError, ServerHandler, ServiceExt, tool, tool_handler, tool_router};
use serde_json::{Map, Value};
use crate::tool_handler::ToolError;
use crate::tool_registry::ToolRegistry;
type ToolArgs = Map<String, Value>;
type ToolResultBody = Map<String, Value>;
#[derive(Clone)]
pub struct CortexServer {
registry: Arc<ToolRegistry>,
}
impl std::fmt::Debug for CortexServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CortexServer")
.field("registry", &self.registry)
.finish()
}
}
impl CortexServer {
#[must_use]
pub fn new(registry: Arc<ToolRegistry>) -> Self {
Self { registry }
}
fn dispatch(
&self,
name: &'static str,
params: ToolArgs,
) -> Result<Json<ToolResultBody>, McpError> {
let raw_params = Value::Object(params);
match self.registry.dispatch(name, raw_params) {
Some(Ok(Value::Object(body))) => Ok(Json(body)),
Some(Ok(other)) => {
tracing::error!(
tool = name,
value_kind = value_kind(&other),
"mcp: tool returned non-object value (violates output schema invariant)"
);
Err(McpError::internal_error(
format!(
"tool '{name}' returned non-object value (kind: {})",
value_kind(&other)
),
None,
))
}
Some(Err(err)) => Err(tool_error_to_mcp(name, err)),
None => {
tracing::error!(
tool = name,
"mcp: tool advertised by #[tool] but not registered in ToolRegistry"
);
Err(McpError::internal_error(
format!("tool '{name}' not registered"),
None,
))
}
}
}
}
fn value_kind(v: &Value) -> &'static str {
match v {
Value::Null => "null",
Value::Bool(_) => "bool",
Value::Number(_) => "number",
Value::String(_) => "string",
Value::Array(_) => "array",
Value::Object(_) => "object",
}
}
fn tool_error_to_mcp(tool_name: &str, err: ToolError) -> McpError {
match err {
ToolError::InvalidParams(msg) => McpError::invalid_params(msg, None),
ToolError::PolicyRejected(msg) => McpError::invalid_params(msg, None),
ToolError::SizeLimitExceeded(msg) => McpError::invalid_params(msg, None),
ToolError::Internal(msg) => {
tracing::warn!(tool = tool_name, error = %msg, "mcp: tool internal error");
McpError::internal_error(msg, None)
}
}
}
#[tool_router]
impl CortexServer {
#[tool(
name = "cortex_search",
description = "Find active memories matching the query. FTS5 by default; \
set `semantic: true` for Ollama-embedding similarity search. \
Returns top-K with relevance scores."
)]
async fn cortex_search(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_search", p)
}
#[tool(
name = "cortex_context",
description = "Build a context pack for the current session. Optionally include \
doctrine snippets and filter by tag, domain, or query."
)]
async fn cortex_context(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_context", p)
}
#[tool(
name = "cortex_memory_health",
description = "Return aggregate counts for active and quarantined memories: \
total, stale (>30 days old), unvalidated, and quarantined."
)]
async fn cortex_memory_health(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_memory_health", p)
}
#[tool(
name = "cortex_config",
description = "Return the active LLM and embedding backend configuration \
(Ollama / OpenAI-compat / Claude HTTP) loaded from cortex.toml."
)]
async fn cortex_config(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_config", p)
}
#[tool(
name = "cortex_suggest",
description = "Server-initiated memory suggestions for the current focus. \
Ranks by FTS5 match + salience; never mutates state."
)]
async fn cortex_suggest(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_suggest", p)
}
#[tool(
name = "cortex_memory_list",
description = "Browse active memories with optional tag/domain/status filters \
and a paging cursor. Read-only."
)]
async fn cortex_memory_list(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_memory_list", p)
}
#[tool(
name = "cortex_memory_outcome",
description = "Mark a specific memory as `helpful` or `not_helpful` for outcome \
tracking. Logs a structured outcome record (ADR 0020 §6)."
)]
async fn cortex_memory_outcome(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_memory_outcome", p)
}
#[tool(
name = "cortex_decay_status",
description = "Inspect the decay job queue: pending evictions and the next \
scheduled decay window."
)]
async fn cortex_decay_status(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_decay_status", p)
}
#[tool(
name = "cortex_doctor",
description = "Run health checks on the store, event log, and configured \
backends. Stores the result for `cortex doctor` to read back."
)]
async fn cortex_doctor(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_doctor", p)
}
#[tool(
name = "cortex_audit_verify",
description = "Verify the JSONL audit log's hash chain end-to-end. Returns \
pass/fail and the first divergence offset on failure."
)]
async fn cortex_audit_verify(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_audit_verify", p)
}
#[tool(
name = "cortex_reflect",
description = "Run a reflection pass over a session trace (optionally with a \
live LLM via `live_reflect: true`). Returns memory candidates."
)]
async fn cortex_reflect(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_reflect", p)
}
#[tool(
name = "cortex_models_list",
description = "List models available to the configured backends: pulled Ollama \
tags and the compile-time Claude allowlist."
)]
async fn cortex_models_list(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_models_list", p)
}
#[tool(
name = "cortex_memory_embed",
description = "Enrich pending memory rows with Ollama embeddings. Idempotent; \
`preview: true` reports the candidate set without writing."
)]
async fn cortex_memory_embed(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_memory_embed", p)
}
#[tool(
name = "cortex_memory_note",
description = "Store an operator-attested fact directly as an active memory. \
Bypasses the reflection pipeline. Required: `claim` (non-empty)."
)]
async fn cortex_memory_note(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_memory_note", p)
}
#[tool(
name = "cortex_session_close",
description = "Index the current session's events into pending memories. \
Use `live_reflect: true` for an LLM pass; otherwise heuristic only."
)]
async fn cortex_session_close(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_session_close", p)
}
#[tool(
name = "cortex_memory_accept",
description = "Promote a specific pending memory candidate to active. Requires \
the operator confirmation token printed to stderr (ADR 0047)."
)]
async fn cortex_memory_accept(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_memory_accept", p)
}
#[tool(
name = "cortex_admit_axiom",
description = "Admit a pinned-authority axiom into the ledger. Requires the \
operator confirmation token (ADR 0026 §4)."
)]
async fn cortex_admit_axiom(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_admit_axiom", p)
}
#[tool(
name = "cortex_session_commit",
description = "Activate the current session's pending_mcp_commit memories. \
Requires the operator confirmation token printed to stderr \
at server startup (ADR 0047 §3)."
)]
async fn cortex_session_commit(
&self,
Parameters(p): Parameters<ToolArgs>,
) -> Result<Json<ToolResultBody>, McpError> {
self.dispatch("cortex_session_commit", p)
}
}
#[tool_handler]
impl ServerHandler for CortexServer {
fn get_info(&self) -> rmcp::model::ServerInfo {
let capabilities = ServerCapabilities::builder().enable_tools().build();
let server_info = Implementation::new(
"cortex".to_string(),
env!("CARGO_PKG_VERSION").to_string(),
);
let instructions = "Cortex MCP server. Memory mutations and session commits \
require an operator-issued confirmation token printed to \
stderr at startup (ADR 0047). Paste the token when prompted \
for `cortex_session_commit` or `cortex_memory_accept`. \
Sensitivity-gated context can be requested via \
`cortex_context` and `cortex_search`.";
InitializeResult::new(capabilities)
.with_protocol_version(ProtocolVersion::V_2025_06_18)
.with_server_info(server_info)
.with_instructions(instructions)
}
}
pub async fn serve_stdio(server: CortexServer) -> Result<(), McpError> {
tracing::info!("cortex mcp: rmcp 1.7 stdio server starting");
let service = server.serve(stdio()).await.map_err(|e| {
McpError::internal_error(format!("serve_stdio init failed: {e}"), None)
})?;
service.waiting().await.map_err(|e| {
McpError::internal_error(format!("serve_stdio loop failed: {e}"), None)
})?;
tracing::info!("cortex mcp: rmcp stdio server shutdown (EOF)");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tool_handler::{GateId, ToolHandler};
struct EchoTool;
impl ToolHandler for EchoTool {
fn name(&self) -> &'static str {
"cortex_search"
}
fn gate_set(&self) -> &'static [GateId] {
&[GateId::FtsRead]
}
fn call(&self, params: Value) -> Result<Value, ToolError> {
Ok(params)
}
}
fn server_with_echo() -> CortexServer {
let mut registry = ToolRegistry::new();
registry.register(Box::new(EchoTool));
CortexServer::new(Arc::new(registry))
}
fn args(v: Value) -> ToolArgs {
match v {
Value::Object(fields) => fields,
_ => panic!("test fixture must pass an object"),
}
}
#[test]
fn dispatch_returns_registry_value_on_success() {
let server = server_with_echo();
let result = server
.dispatch("cortex_search", args(serde_json::json!({"q": "hello"})))
.expect("registered tool dispatches");
let Json(body) = result;
assert_eq!(body.get("q"), Some(&Value::String("hello".into())));
}
#[test]
fn dispatch_unregistered_tool_returns_internal_error() {
let server = server_with_echo();
let result = server.dispatch("cortex_missing", ToolArgs::new());
match result {
Ok(_) => panic!("unregistered tool must error"),
Err(err) => assert!(
err.message.contains("cortex_missing"),
"error must name the missing tool: {}",
err.message
),
}
}
#[test]
fn dispatch_non_object_tool_value_returns_internal_error() {
struct NonObjectTool;
impl ToolHandler for NonObjectTool {
fn name(&self) -> &'static str {
"cortex_search"
}
fn gate_set(&self) -> &'static [GateId] {
&[GateId::FtsRead]
}
fn call(&self, _params: Value) -> Result<Value, ToolError> {
Ok(Value::String("not an object".into()))
}
}
let mut registry = ToolRegistry::new();
registry.register(Box::new(NonObjectTool));
let server = CortexServer::new(Arc::new(registry));
let result = server.dispatch("cortex_search", ToolArgs::new());
match result {
Ok(_) => panic!("non-object tool value must error"),
Err(err) => {
assert_eq!(err.code, rmcp::model::ErrorCode::INTERNAL_ERROR);
assert!(
err.message.contains("non-object"),
"error must say non-object: {}",
err.message
);
}
}
}
#[test]
fn tool_error_invalid_params_maps_to_invalid_params() {
let err = tool_error_to_mcp("t", ToolError::InvalidParams("bad".into()));
assert_eq!(err.code, rmcp::model::ErrorCode::INVALID_PARAMS);
}
#[test]
fn tool_error_policy_rejected_maps_to_invalid_params() {
let err = tool_error_to_mcp("t", ToolError::PolicyRejected("nope".into()));
assert_eq!(err.code, rmcp::model::ErrorCode::INVALID_PARAMS);
}
#[test]
fn tool_error_internal_maps_to_internal_error() {
let err = tool_error_to_mcp("t", ToolError::Internal("kaboom".into()));
assert_eq!(err.code, rmcp::model::ErrorCode::INTERNAL_ERROR);
}
}