mod tools;
use rmcp::handler::server::router::tool::ToolRouter;
use rmcp::handler::server::wrapper::Parameters;
use rmcp::model::{CallToolResult, Content, ServerCapabilities, ServerInfo};
use rmcp::{tool, tool_handler, tool_router, ErrorData as McpError, ServerHandler, ServiceExt};
use serde::de::DeserializeOwned;
use serde_json::Value;
use uuid::Uuid;
use crate::bootstrap;
use crate::config::Config;
use crate::domain::CandidateStatus;
use crate::error::AppError;
use crate::state::AppState;
use tools::*;
const INSTRUCTIONS: &str = "Decision Cockpit is a product-decision memory system. \
You operate it by creating extraction candidates, drift signals, and memo drafts. \
You do NOT create canonical decisions directly: create candidates and let a human accept them. \
Extraction flow: list_recent_documents(status=\"new\") -> get_document -> \
create_extraction_candidate (one per item). For assumption/action/evidence candidates, set \
\"relates_to_decision_id\" in the payload so they auto-link to their decision on accept; or \
call create_relation afterwards to connect existing entities. \
Drift flow: list_decisions -> get_decision_context -> create_drift_signal. \
Memo flow: gather context (read existing memos with list_memos/get_memo so you build on prior \
conclusions), then create_memo as a draft; refine it with update_memo and set status=final when done. \
Use open_dashboard to launch the review UI in the browser.";
#[derive(Clone)]
pub struct CockpitMcp {
state: AppState,
config: Config,
#[allow(dead_code)]
tool_router: ToolRouter<Self>,
}
impl CockpitMcp {
pub fn new(state: AppState, config: Config) -> Self {
Self {
state,
config,
tool_router: Self::tool_router(),
}
}
}
#[tool_router]
impl CockpitMcp {
#[tool(description = "Store a source document (product context) and return its id.")]
async fn create_document(
&self,
Parameters(p): Parameters<CreateDocumentParams>,
) -> Result<CallToolResult, McpError> {
let doc = crate::services::documents::create_document(
&self.state.pool,
&p.title,
&p.source_type,
&p.raw_text,
)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({ "document_id": doc.id, "document": doc }))
}
#[tool(description = "Fetch a single document by id, including its raw text.")]
async fn get_document(
&self,
Parameters(p): Parameters<GetDocumentParams>,
) -> Result<CallToolResult, McpError> {
let id = parse_uuid(&p.document_id)?;
let doc = crate::services::documents::get_document(&self.state.pool, id)
.await
.map_err(to_mcp)?;
ok(serde_json::to_value(doc).map_err(internal)?)
}
#[tool(
description = "List the most recent documents (summaries, newest first). Pass \
status=\"new\" to fetch only documents that have not been extracted yet."
)]
async fn list_recent_documents(
&self,
Parameters(p): Parameters<ListRecentDocumentsParams>,
) -> Result<CallToolResult, McpError> {
let docs = crate::services::documents::list_recent_documents(
&self.state.pool,
p.status.as_deref(),
p.limit,
)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({ "documents": docs }))
}
#[tool(
description = "Create a pending extraction candidate for a document. candidate_type is one \
of: decision, assumption, action, evidence, risk, goal, open_question. payload is a JSON \
object whose shape depends on the type."
)]
async fn create_extraction_candidate(
&self,
Parameters(p): Parameters<CreateCandidateParams>,
) -> Result<CallToolResult, McpError> {
let document_id = match p.document_id.as_deref() {
Some(s) => Some(parse_uuid(s)?),
None => None,
};
let candidate_type = parse_enum(&p.candidate_type, "candidate_type")?;
let candidate = crate::services::candidates::create_candidate(
&self.state.pool,
document_id,
candidate_type,
p.payload,
)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({
"candidate_id": candidate.id,
"status": candidate.status,
}))
}
#[tool(description = "List canonical decisions, optionally filtered by status.")]
async fn list_decisions(
&self,
Parameters(p): Parameters<ListDecisionsParams>,
) -> Result<CallToolResult, McpError> {
let decisions = crate::services::graph::list_decisions(
&self.state.pool,
p.status.as_deref(),
p.limit,
)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({ "decisions": decisions }))
}
#[tool(
description = "Get a decision plus its linked assumptions, actions, evidence, relations, \
and drift signals."
)]
async fn get_decision_context(
&self,
Parameters(p): Parameters<GetDecisionContextParams>,
) -> Result<CallToolResult, McpError> {
let id = parse_uuid(&p.decision_id)?;
let context = crate::services::graph::get_decision_context(&self.state.pool, id)
.await
.map_err(to_mcp)?;
ok(serde_json::to_value(context).map_err(internal)?)
}
#[tool(
description = "Create a drift signal (a reviewable suggestion). drift_type: \
assumption_challenged, decision_contradicted, action_stale, goal_mismatch, \
evidence_outdated. target_entity_type: decision, assumption, action, evidence, \
drift_signal, memo. severity: low, medium, high."
)]
async fn create_drift_signal(
&self,
Parameters(p): Parameters<CreateDriftSignalParams>,
) -> Result<CallToolResult, McpError> {
let input = crate::services::drift::NewDriftSignalInput {
drift_type: parse_enum(&p.drift_type, "drift_type")?,
target_entity_id: parse_uuid(&p.target_entity_id)?,
target_entity_type: parse_enum(&p.target_entity_type, "target_entity_type")?,
summary: p.summary,
severity: parse_enum(&p.severity, "severity")?,
explanation: p.explanation,
};
let signal = crate::services::drift::create_drift_signal(&self.state.pool, input)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({
"drift_signal_id": signal.id,
"status": signal.status,
}))
}
#[tool(
description = "Save a memo (markdown body). The agent writes the markdown itself. \
Defaults to status=draft; set status=final to publish."
)]
async fn create_memo(
&self,
Parameters(p): Parameters<CreateMemoParams>,
) -> Result<CallToolResult, McpError> {
let memo = crate::services::memos::create_memo(
&self.state.pool,
&p.memo_type,
&p.title,
&p.body_markdown,
p.status.as_deref(),
)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({ "memo_id": memo.id, "status": memo.status }))
}
#[tool(
description = "Update a memo: fill in a draft's body, rename it, or mark it final. \
Only the provided fields change."
)]
async fn update_memo(
&self,
Parameters(p): Parameters<UpdateMemoParams>,
) -> Result<CallToolResult, McpError> {
let id = parse_uuid(&p.memo_id)?;
let memo = crate::services::memos::update_memo(
&self.state.pool,
id,
p.title.as_deref(),
p.memo_type.as_deref(),
p.body_markdown.as_deref(),
p.status.as_deref(),
)
.await
.map_err(to_mcp)?;
ok(serde_json::to_value(memo).map_err(internal)?)
}
#[tool(description = "List recent memos (newest first).")]
async fn list_memos(
&self,
Parameters(p): Parameters<ListLimitParams>,
) -> Result<CallToolResult, McpError> {
let memos = crate::services::memos::list_memos(&self.state.pool, p.limit)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({ "memos": memos }))
}
#[tool(description = "Fetch a single memo (including its full markdown body) by id.")]
async fn get_memo(
&self,
Parameters(p): Parameters<GetMemoParams>,
) -> Result<CallToolResult, McpError> {
let id = parse_uuid(&p.memo_id)?;
let memo = crate::services::memos::get_memo(&self.state.pool, id)
.await
.map_err(to_mcp)?;
ok(serde_json::to_value(memo).map_err(internal)?)
}
#[tool(
description = "Link two canonical entities in the decision graph. relation_type: \
supports, depends_on, challenges, contradicts, produces, mitigates, replaces, \
related_to. entity types: decision, assumption, action, evidence, drift_signal, memo. \
Use this to attach assumptions/actions/evidence to a decision."
)]
async fn create_relation(
&self,
Parameters(p): Parameters<CreateRelationParams>,
) -> Result<CallToolResult, McpError> {
let input = crate::services::relations::NewRelationInput {
from_entity_id: parse_uuid(&p.from_entity_id)?,
from_entity_type: parse_enum(&p.from_entity_type, "from_entity_type")?,
to_entity_id: parse_uuid(&p.to_entity_id)?,
to_entity_type: parse_enum(&p.to_entity_type, "to_entity_type")?,
relation_type: parse_enum(&p.relation_type, "relation_type")?,
};
let relation = crate::services::relations::create_relation(&self.state.pool, input)
.await
.map_err(to_mcp)?;
ok(serde_json::to_value(relation).map_err(internal)?)
}
#[tool(description = "List open (unreviewed) drift signals.")]
async fn list_open_drift_signals(
&self,
Parameters(p): Parameters<ListLimitParams>,
) -> Result<CallToolResult, McpError> {
let signals =
crate::services::drift::list_open_drift_signals(&self.state.pool, p.limit)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({ "drift_signals": signals }))
}
#[tool(
description = "Open the Decision Cockpit dashboard in the system browser. \
The HTTP server starts automatically when this MCP process is enabled."
)]
async fn open_dashboard(&self) -> Result<CallToolResult, McpError> {
let url = self.config.dashboard_url();
bootstrap::open_browser(&url).map_err(|e| {
tracing::warn!(error = %e, "could not open browser automatically");
McpError::internal_error(
format!("{e} Open {url} manually in your browser."),
None,
)
})?;
let pending = crate::services::candidates::list_candidates(
&self.state.pool,
Some(CandidateStatus::Pending),
None,
)
.await
.map_err(to_mcp)?;
let open_drift = crate::services::drift::list_open_drift_signals(&self.state.pool, None)
.await
.map_err(to_mcp)?;
ok(serde_json::json!({
"url": url,
"pending_candidates": pending.len(),
"open_drift_signals": open_drift.len(),
}))
}
}
#[tool_handler]
impl ServerHandler for CockpitMcp {
fn get_info(&self) -> ServerInfo {
let mut info = ServerInfo::default();
info.instructions = Some(INSTRUCTIONS.to_string());
info.capabilities = ServerCapabilities::builder().enable_tools().build();
info
}
}
pub async fn serve_stdio(state: AppState, config: Config) -> anyhow::Result<()> {
let service = CockpitMcp::new(state, config)
.serve(rmcp::transport::stdio())
.await?;
service.waiting().await?;
Ok(())
}
fn ok(value: Value) -> Result<CallToolResult, McpError> {
let text = serde_json::to_string_pretty(&value).map_err(internal)?;
Ok(CallToolResult::success(vec![Content::text(text)]))
}
fn parse_uuid(s: &str) -> Result<Uuid, McpError> {
Uuid::parse_str(s.trim())
.map_err(|_| McpError::invalid_params(format!("`{s}` is not a valid UUID"), None))
}
fn parse_enum<T: DeserializeOwned>(s: &str, what: &str) -> Result<T, McpError> {
serde_json::from_value::<T>(Value::String(s.trim().to_string()))
.map_err(|_| McpError::invalid_params(format!("`{s}` is not a valid {what}"), None))
}
fn internal<E: std::fmt::Display>(e: E) -> McpError {
McpError::internal_error(e.to_string(), None)
}
fn to_mcp(e: AppError) -> McpError {
match e {
AppError::Validation(m) => McpError::invalid_params(m, None),
AppError::NotFound(m) => McpError::invalid_params(m, None),
AppError::Conflict(m) => McpError::invalid_params(m, None),
AppError::Database(_) | AppError::Serialization(_) | AppError::Other(_) => {
tracing::error!(error = %e, "mcp tool internal error");
McpError::internal_error("internal server error".to_string(), None)
}
}
}