crtx-mcp 0.1.2

MCP stdio JSON-RPC 2.0 server for Cortex — tool dispatch, ToolHandler trait, gate wiring (ADR 0045).
Documentation
//! `cortex_decay_run` MCP tool handler.
//!
//! Drives a scheduled decay job through its state machine. Mirrors the write
//! path of `cortex decay run` (`crates/cortex-cli/src/cmd/decay.rs`
//! `run_run` fn) via the authoritative runner entrypoints in
//! [`cortex_memory::decay::runner`].
//!
//! Gate: [`GateId::SessionWrite`] — supervised tier, logs at info.
//!
//! Schema:
//! ```text
//! cortex_decay_run(
//!   job_id?:       string,   // run a specific job by id
//!   next_pending?: bool,     // dequeue and run the next pending job
//! ) -> {
//!   job_id:        string,
//!   outcome:       "completed" | "failed",
//!   state_reason?: string,
//! }
//! ```
//!
//! Exactly one of `job_id` or `next_pending: true` must be supplied. The tool
//! does not surface `next_pending: false` — omitting both parameters is
//! refused with `InvalidParams`.
//!
//! LLM-summary jobs are refused at the MCP boundary: the operator attestation
//! file path required by the runner cannot be supplied via the JSON-RPC
//! transport. Use `cortex decay run` via the CLI for those jobs.

use std::sync::{Arc, Mutex};

use chrono::Utc;
use cortex_core::DecayJobId;
use cortex_llm::NoopSummaryBackend;
use cortex_memory::decay::runner::{run_next_pending_job, run_specific_job};
use cortex_store::repo::DecayJobRepo;
use cortex_store::Pool;
use serde_json::{json, Value};
use tracing::info;

use crate::tool_handler::{GateId, ToolError, ToolHandler};

/// MCP tool: `cortex_decay_run`.
#[derive(Debug)]
pub struct CortexDecayRunTool {
    pool: Arc<Mutex<Pool>>,
}

impl CortexDecayRunTool {
    /// Construct the tool over a shared, mutex-guarded store connection.
    #[must_use]
    pub fn new(pool: Arc<Mutex<Pool>>) -> Self {
        Self { pool }
    }
}

impl ToolHandler for CortexDecayRunTool {
    fn name(&self) -> &'static str {
        "cortex_decay_run"
    }

    fn gate_set(&self) -> &'static [GateId] {
        &[GateId::SessionWrite]
    }

    fn call(&self, params: Value) -> Result<Value, ToolError> {
        // ── 1. Resolve the dispatch mode ──────────────────────────────────
        let job_id_str = params["job_id"].as_str().filter(|s| !s.is_empty());
        let next_pending = params["next_pending"].as_bool().unwrap_or(false);

        if job_id_str.is_none() && !next_pending {
            return Err(ToolError::InvalidParams(
                "one of job_id or next_pending must be supplied".into(),
            ));
        }
        if job_id_str.is_some() && next_pending {
            return Err(ToolError::InvalidParams(
                "job_id and next_pending are mutually exclusive".into(),
            ));
        }

        // ── 2. Parse job_id when present ──────────────────────────────────
        let target_id: Option<DecayJobId> = match job_id_str {
            None => None,
            Some(raw) => {
                let id = raw.parse::<DecayJobId>().map_err(|err| {
                    ToolError::InvalidParams(format!(
                        "job_id `{raw}` is not a valid decay job id: {err}"
                    ))
                })?;
                Some(id)
            }
        };

        let now = Utc::now();

        // ── 3. Acquire the pool and identify the target row ───────────────
        let pool = self
            .pool
            .lock()
            .map_err(|err| ToolError::Internal(format!("pool lock poisoned: {err}")))?;

        // Peek at the job before running so we can surface the id on the
        // response even for the next_pending path.
        let preview_id: DecayJobId = match &target_id {
            Some(id) => {
                let repo = DecayJobRepo::new(&pool);
                match repo.read(id) {
                    Ok(Some(rec)) => {
                        // Refuse LLM-summary jobs: attestation path cannot be
                        // supplied via the MCP transport.
                        if rec.summary_method_wire == "llm_summary" {
                            return Err(ToolError::InvalidParams(format!(
                                "job `{id}` declares summary_method=llm_summary; \
                                 use `cortex decay run --job-id {id} \
                                 --operator-attestation <PATH>` via the CLI"
                            )));
                        }
                        rec.id
                    }
                    Ok(None) => {
                        return Err(ToolError::InvalidParams(format!(
                            "decay job `{id}` not found"
                        )));
                    }
                    Err(err) => {
                        return Err(ToolError::Internal(format!(
                            "failed to load decay job `{id}`: {err}"
                        )));
                    }
                }
            }
            None => {
                // Peek at the next pending job to check its summary method and
                // record the id for the response.
                let repo = DecayJobRepo::new(&pool);
                let pending = repo.list_pending_ready(now).map_err(|err| {
                    ToolError::Internal(format!("failed to scan pending queue: {err}"))
                })?;
                let rec = pending.into_iter().next().ok_or_else(|| {
                    ToolError::InvalidParams(
                        "no pending decay jobs whose scheduled_for is in the past".into(),
                    )
                })?;
                if rec.summary_method_wire == "llm_summary" {
                    return Err(ToolError::InvalidParams(
                        "next pending job declares summary_method=llm_summary; \
                         use `cortex decay run --next-pending \
                         --operator-attestation <PATH>` via the CLI"
                            .into(),
                    ));
                }
                rec.id
            }
        };

        info!(
            job_id = %preview_id,
            mode = if target_id.is_some() { "specific" } else { "next_pending" },
            "cortex_decay_run via MCP"
        );

        // ── 4. Invoke the runner ──────────────────────────────────────────
        let runner_result = if let Some(id) = &target_id {
            run_specific_job(&pool, id, now, &NoopSummaryBackend)
        } else {
            match run_next_pending_job(&pool, now, &NoopSummaryBackend) {
                Ok(Some(_)) => Ok(()),
                Ok(None) => {
                    // Queue became empty between the peek and the dispatch —
                    // treat as a benign race and surface it as InvalidParams.
                    return Err(ToolError::InvalidParams(
                        "no pending decay jobs whose scheduled_for is in the past".into(),
                    ));
                }
                Err(err) => Err(err),
            }
        };

        // ── 5. Re-read the post-run state ─────────────────────────────────
        let repo = DecayJobRepo::new(&pool);
        let post = repo.read(&preview_id).map_err(|err| {
            ToolError::Internal(format!(
                "failed to re-read decay job `{preview_id}` after dispatch: {err}"
            ))
        })?;
        let post = post.ok_or_else(|| {
            ToolError::Internal(format!(
                "decay job `{preview_id}` disappeared after dispatch (substrate drift)"
            ))
        })?;

        // ── 6. Map runner result → outcome ────────────────────────────────
        let outcome = match runner_result {
            Ok(()) => "completed",
            Err(_) => "failed",
        };

        Ok(json!({
            "job_id":       post.id.to_string(),
            "outcome":      outcome,
            "state_reason": post.state_reason,
        }))
    }
}