decision_cockpit 0.1.0

Layer — product decision memory with MCP tools and an embedded review dashboard
Documentation
//! MCP stdio server built on the official `rmcp` SDK.
//!
//! Tools are thin adapters over the same service layer used by the HTTP API.
//! IDs and enums are accepted as strings (and parsed here) to keep the
//! generated JSON schemas dependency-light.

mod tools;

use rmcp::handler::server::router::tool::ToolRouter;
use rmcp::handler::server::wrapper::Parameters;
use rmcp::model::{CallToolResult, Content, ServerCapabilities, ServerInfo};
use rmcp::{tool, tool_handler, tool_router, ErrorData as McpError, ServerHandler, ServiceExt};
use serde::de::DeserializeOwned;
use serde_json::Value;
use uuid::Uuid;

use crate::bootstrap;
use crate::config::Config;
use crate::domain::CandidateStatus;
use crate::error::AppError;
use crate::state::AppState;

use tools::*;

/// Instructions surfaced to the agent during MCP `initialize`.
const INSTRUCTIONS: &str = "Decision Cockpit is a product-decision memory system. \
You operate it by creating extraction candidates, drift signals, and memo drafts. \
You do NOT create canonical decisions directly: create candidates and let a human accept them. \
Extraction flow: list_recent_documents(status=\"new\") -> get_document -> \
create_extraction_candidate (one per item). For assumption/action/evidence candidates, set \
\"relates_to_decision_id\" in the payload so they auto-link to their decision on accept; or \
call create_relation afterwards to connect existing entities. \
Drift flow: list_decisions -> get_decision_context -> create_drift_signal. \
Memo flow: gather context (read existing memos with list_memos/get_memo so you build on prior \
conclusions), then create_memo as a draft; refine it with update_memo and set status=final when done. \
Use open_dashboard to launch the review UI in the browser.";

#[derive(Clone)]
pub struct CockpitMcp {
    state: AppState,
    config: Config,
    // Read by the generated `#[tool_handler]` routing code.
    #[allow(dead_code)]
    tool_router: ToolRouter<Self>,
}

impl CockpitMcp {
    pub fn new(state: AppState, config: Config) -> Self {
        Self {
            state,
            config,
            tool_router: Self::tool_router(),
        }
    }
}

#[tool_router]
impl CockpitMcp {
    #[tool(description = "Store a source document (product context) and return its id.")]
    async fn create_document(
        &self,
        Parameters(p): Parameters<CreateDocumentParams>,
    ) -> Result<CallToolResult, McpError> {
        let doc = crate::services::documents::create_document(
            &self.state.pool,
            &p.title,
            &p.source_type,
            &p.raw_text,
        )
        .await
        .map_err(to_mcp)?;
        ok(serde_json::json!({ "document_id": doc.id, "document": doc }))
    }

    #[tool(description = "Fetch a single document by id, including its raw text.")]
    async fn get_document(
        &self,
        Parameters(p): Parameters<GetDocumentParams>,
    ) -> Result<CallToolResult, McpError> {
        let id = parse_uuid(&p.document_id)?;
        let doc = crate::services::documents::get_document(&self.state.pool, id)
            .await
            .map_err(to_mcp)?;
        ok(serde_json::to_value(doc).map_err(internal)?)
    }

    #[tool(
        description = "List the most recent documents (summaries, newest first). Pass \
        status=\"new\" to fetch only documents that have not been extracted yet."
    )]
    async fn list_recent_documents(
        &self,
        Parameters(p): Parameters<ListRecentDocumentsParams>,
    ) -> Result<CallToolResult, McpError> {
        let docs = crate::services::documents::list_recent_documents(
            &self.state.pool,
            p.status.as_deref(),
            p.limit,
        )
        .await
        .map_err(to_mcp)?;
        ok(serde_json::json!({ "documents": docs }))
    }

    #[tool(
        description = "Create a pending extraction candidate for a document. candidate_type is one \
        of: decision, assumption, action, evidence, risk, goal, open_question. payload is a JSON \
        object whose shape depends on the type."
    )]
    async fn create_extraction_candidate(
        &self,
        Parameters(p): Parameters<CreateCandidateParams>,
    ) -> Result<CallToolResult, McpError> {
        let document_id = match p.document_id.as_deref() {
            Some(s) => Some(parse_uuid(s)?),
            None => None,
        };
        let candidate_type = parse_enum(&p.candidate_type, "candidate_type")?;
        let candidate = crate::services::candidates::create_candidate(
            &self.state.pool,
            document_id,
            candidate_type,
            p.payload,
        )
        .await
        .map_err(to_mcp)?;
        ok(serde_json::json!({
            "candidate_id": candidate.id,
            "status": candidate.status,
        }))
    }

    #[tool(description = "List canonical decisions, optionally filtered by status.")]
    async fn list_decisions(
        &self,
        Parameters(p): Parameters<ListDecisionsParams>,
    ) -> Result<CallToolResult, McpError> {
        let decisions = crate::services::graph::list_decisions(
            &self.state.pool,
            p.status.as_deref(),
            p.limit,
        )
        .await
        .map_err(to_mcp)?;
        ok(serde_json::json!({ "decisions": decisions }))
    }

    #[tool(
        description = "Get a decision plus its linked assumptions, actions, evidence, relations, \
        and drift signals."
    )]
    async fn get_decision_context(
        &self,
        Parameters(p): Parameters<GetDecisionContextParams>,
    ) -> Result<CallToolResult, McpError> {
        let id = parse_uuid(&p.decision_id)?;
        let context = crate::services::graph::get_decision_context(&self.state.pool, id)
            .await
            .map_err(to_mcp)?;
        ok(serde_json::to_value(context).map_err(internal)?)
    }

    #[tool(
        description = "Create a drift signal (a reviewable suggestion). drift_type: \
        assumption_challenged, decision_contradicted, action_stale, goal_mismatch, \
        evidence_outdated. target_entity_type: decision, assumption, action, evidence, \
        drift_signal, memo. severity: low, medium, high."
    )]
    async fn create_drift_signal(
        &self,
        Parameters(p): Parameters<CreateDriftSignalParams>,
    ) -> Result<CallToolResult, McpError> {
        let input = crate::services::drift::NewDriftSignalInput {
            drift_type: parse_enum(&p.drift_type, "drift_type")?,
            target_entity_id: parse_uuid(&p.target_entity_id)?,
            target_entity_type: parse_enum(&p.target_entity_type, "target_entity_type")?,
            summary: p.summary,
            severity: parse_enum(&p.severity, "severity")?,
            explanation: p.explanation,
        };
        let signal = crate::services::drift::create_drift_signal(&self.state.pool, input)
            .await
            .map_err(to_mcp)?;
        ok(serde_json::json!({
            "drift_signal_id": signal.id,
            "status": signal.status,
        }))
    }

    #[tool(
        description = "Save a memo (markdown body). The agent writes the markdown itself. \
        Defaults to status=draft; set status=final to publish."
    )]
    async fn create_memo(
        &self,
        Parameters(p): Parameters<CreateMemoParams>,
    ) -> Result<CallToolResult, McpError> {
        let memo = crate::services::memos::create_memo(
            &self.state.pool,
            &p.memo_type,
            &p.title,
            &p.body_markdown,
            p.status.as_deref(),
        )
        .await
        .map_err(to_mcp)?;
        ok(serde_json::json!({ "memo_id": memo.id, "status": memo.status }))
    }

    #[tool(
        description = "Update a memo: fill in a draft's body, rename it, or mark it final. \
        Only the provided fields change."
    )]
    async fn update_memo(
        &self,
        Parameters(p): Parameters<UpdateMemoParams>,
    ) -> Result<CallToolResult, McpError> {
        let id = parse_uuid(&p.memo_id)?;
        let memo = crate::services::memos::update_memo(
            &self.state.pool,
            id,
            p.title.as_deref(),
            p.memo_type.as_deref(),
            p.body_markdown.as_deref(),
            p.status.as_deref(),
        )
        .await
        .map_err(to_mcp)?;
        ok(serde_json::to_value(memo).map_err(internal)?)
    }

    #[tool(description = "List recent memos (newest first).")]
    async fn list_memos(
        &self,
        Parameters(p): Parameters<ListLimitParams>,
    ) -> Result<CallToolResult, McpError> {
        let memos = crate::services::memos::list_memos(&self.state.pool, p.limit)
            .await
            .map_err(to_mcp)?;
        ok(serde_json::json!({ "memos": memos }))
    }

    #[tool(description = "Fetch a single memo (including its full markdown body) by id.")]
    async fn get_memo(
        &self,
        Parameters(p): Parameters<GetMemoParams>,
    ) -> Result<CallToolResult, McpError> {
        let id = parse_uuid(&p.memo_id)?;
        let memo = crate::services::memos::get_memo(&self.state.pool, id)
            .await
            .map_err(to_mcp)?;
        ok(serde_json::to_value(memo).map_err(internal)?)
    }

    #[tool(
        description = "Link two canonical entities in the decision graph. relation_type: \
        supports, depends_on, challenges, contradicts, produces, mitigates, replaces, \
        related_to. entity types: decision, assumption, action, evidence, drift_signal, memo. \
        Use this to attach assumptions/actions/evidence to a decision."
    )]
    async fn create_relation(
        &self,
        Parameters(p): Parameters<CreateRelationParams>,
    ) -> Result<CallToolResult, McpError> {
        let input = crate::services::relations::NewRelationInput {
            from_entity_id: parse_uuid(&p.from_entity_id)?,
            from_entity_type: parse_enum(&p.from_entity_type, "from_entity_type")?,
            to_entity_id: parse_uuid(&p.to_entity_id)?,
            to_entity_type: parse_enum(&p.to_entity_type, "to_entity_type")?,
            relation_type: parse_enum(&p.relation_type, "relation_type")?,
        };
        let relation = crate::services::relations::create_relation(&self.state.pool, input)
            .await
            .map_err(to_mcp)?;
        ok(serde_json::to_value(relation).map_err(internal)?)
    }

    #[tool(description = "List open (unreviewed) drift signals.")]
    async fn list_open_drift_signals(
        &self,
        Parameters(p): Parameters<ListLimitParams>,
    ) -> Result<CallToolResult, McpError> {
        let signals =
            crate::services::drift::list_open_drift_signals(&self.state.pool, p.limit)
                .await
                .map_err(to_mcp)?;
        ok(serde_json::json!({ "drift_signals": signals }))
    }

    #[tool(
        description = "Open the Decision Cockpit dashboard in the system browser. \
        The HTTP server starts automatically when this MCP process is enabled."
    )]
    async fn open_dashboard(&self) -> Result<CallToolResult, McpError> {
        let url = self.config.dashboard_url();

        bootstrap::open_browser(&url).map_err(|e| {
            tracing::warn!(error = %e, "could not open browser automatically");
            McpError::internal_error(
                format!("{e} Open {url} manually in your browser."),
                None,
            )
        })?;

        let pending = crate::services::candidates::list_candidates(
            &self.state.pool,
            Some(CandidateStatus::Pending),
            None,
        )
        .await
        .map_err(to_mcp)?;

        let open_drift = crate::services::drift::list_open_drift_signals(&self.state.pool, None)
            .await
            .map_err(to_mcp)?;

        ok(serde_json::json!({
            "url": url,
            "pending_candidates": pending.len(),
            "open_drift_signals": open_drift.len(),
        }))
    }
}

#[tool_handler]
impl ServerHandler for CockpitMcp {
    fn get_info(&self) -> ServerInfo {
        let mut info = ServerInfo::default();
        info.instructions = Some(INSTRUCTIONS.to_string());
        info.capabilities = ServerCapabilities::builder().enable_tools().build();
        info
    }
}

/// Start the MCP server over stdio and block until the client disconnects.
pub async fn serve_stdio(state: AppState, config: Config) -> anyhow::Result<()> {
    let service = CockpitMcp::new(state, config)
        .serve(rmcp::transport::stdio())
        .await?;
    service.waiting().await?;
    Ok(())
}

/// Return a tool result as a single JSON text-content block.
fn ok(value: Value) -> Result<CallToolResult, McpError> {
    let text = serde_json::to_string_pretty(&value).map_err(internal)?;
    Ok(CallToolResult::success(vec![Content::text(text)]))
}

fn parse_uuid(s: &str) -> Result<Uuid, McpError> {
    Uuid::parse_str(s.trim())
        .map_err(|_| McpError::invalid_params(format!("`{s}` is not a valid UUID"), None))
}

fn parse_enum<T: DeserializeOwned>(s: &str, what: &str) -> Result<T, McpError> {
    serde_json::from_value::<T>(Value::String(s.trim().to_string()))
        .map_err(|_| McpError::invalid_params(format!("`{s}` is not a valid {what}"), None))
}

fn internal<E: std::fmt::Display>(e: E) -> McpError {
    McpError::internal_error(e.to_string(), None)
}

/// Map a domain error onto the closest MCP error code.
fn to_mcp(e: AppError) -> McpError {
    match e {
        AppError::Validation(m) => McpError::invalid_params(m, None),
        AppError::NotFound(m) => McpError::invalid_params(m, None),
        AppError::Conflict(m) => McpError::invalid_params(m, None),
        AppError::Database(_) | AppError::Serialization(_) | AppError::Other(_) => {
            tracing::error!(error = %e, "mcp tool internal error");
            McpError::internal_error("internal server error".to_string(), None)
        }
    }
}