Skip to main content

cortex_mcp/tools/
decay_run.rs

1//! `cortex_decay_run` MCP tool handler.
2//!
3//! Drives a scheduled decay job through its state machine. Mirrors the write
4//! path of `cortex decay run` (`crates/cortex-cli/src/cmd/decay.rs`
5//! `run_run` fn) via the authoritative runner entrypoints in
6//! [`cortex_memory::decay::runner`].
7//!
8//! Gate: [`GateId::SessionWrite`] — supervised tier, logs at info.
9//!
10//! Schema:
11//! ```text
12//! cortex_decay_run(
13//!   job_id?:       string,   // run a specific job by id
14//!   next_pending?: bool,     // dequeue and run the next pending job
15//! ) -> {
16//!   job_id:        string,
17//!   outcome:       "completed" | "failed",
18//!   state_reason?: string,
19//! }
20//! ```
21//!
22//! Exactly one of `job_id` or `next_pending: true` must be supplied. The tool
23//! does not surface `next_pending: false` — omitting both parameters is
24//! refused with `InvalidParams`.
25//!
26//! LLM-summary jobs are refused at the MCP boundary: the operator attestation
27//! file path required by the runner cannot be supplied via the JSON-RPC
28//! transport. Use `cortex decay run` via the CLI for those jobs.
29
30use std::sync::{Arc, Mutex};
31
32use chrono::Utc;
33use cortex_core::DecayJobId;
34use cortex_llm::NoopSummaryBackend;
35use cortex_memory::decay::runner::{run_next_pending_job, run_specific_job};
36use cortex_store::repo::DecayJobRepo;
37use cortex_store::Pool;
38use serde_json::{json, Value};
39use tracing::info;
40
41use crate::tool_handler::{GateId, ToolError, ToolHandler};
42
43/// MCP tool: `cortex_decay_run`.
44#[derive(Debug)]
45pub struct CortexDecayRunTool {
46    pool: Arc<Mutex<Pool>>,
47}
48
49impl CortexDecayRunTool {
50    /// Construct the tool over a shared, mutex-guarded store connection.
51    #[must_use]
52    pub fn new(pool: Arc<Mutex<Pool>>) -> Self {
53        Self { pool }
54    }
55}
56
57impl ToolHandler for CortexDecayRunTool {
58    fn name(&self) -> &'static str {
59        "cortex_decay_run"
60    }
61
62    fn gate_set(&self) -> &'static [GateId] {
63        &[GateId::SessionWrite]
64    }
65
66    fn call(&self, params: Value) -> Result<Value, ToolError> {
67        // ── 1. Resolve the dispatch mode ──────────────────────────────────
68        let job_id_str = params["job_id"].as_str().filter(|s| !s.is_empty());
69        let next_pending = params["next_pending"].as_bool().unwrap_or(false);
70
71        if job_id_str.is_none() && !next_pending {
72            return Err(ToolError::InvalidParams(
73                "one of job_id or next_pending must be supplied".into(),
74            ));
75        }
76        if job_id_str.is_some() && next_pending {
77            return Err(ToolError::InvalidParams(
78                "job_id and next_pending are mutually exclusive".into(),
79            ));
80        }
81
82        // ── 2. Parse job_id when present ──────────────────────────────────
83        let target_id: Option<DecayJobId> = match job_id_str {
84            None => None,
85            Some(raw) => {
86                let id = raw.parse::<DecayJobId>().map_err(|err| {
87                    ToolError::InvalidParams(format!(
88                        "job_id `{raw}` is not a valid decay job id: {err}"
89                    ))
90                })?;
91                Some(id)
92            }
93        };
94
95        let now = Utc::now();
96
97        // ── 3. Acquire the pool and identify the target row ───────────────
98        let pool = self
99            .pool
100            .lock()
101            .map_err(|err| ToolError::Internal(format!("pool lock poisoned: {err}")))?;
102
103        // Peek at the job before running so we can surface the id on the
104        // response even for the next_pending path.
105        let preview_id: DecayJobId = match &target_id {
106            Some(id) => {
107                let repo = DecayJobRepo::new(&pool);
108                match repo.read(id) {
109                    Ok(Some(rec)) => {
110                        // Refuse LLM-summary jobs: attestation path cannot be
111                        // supplied via the MCP transport.
112                        if rec.summary_method_wire == "llm_summary" {
113                            return Err(ToolError::InvalidParams(format!(
114                                "job `{id}` declares summary_method=llm_summary; \
115                                 use `cortex decay run --job-id {id} \
116                                 --operator-attestation <PATH>` via the CLI"
117                            )));
118                        }
119                        rec.id
120                    }
121                    Ok(None) => {
122                        return Err(ToolError::InvalidParams(format!(
123                            "decay job `{id}` not found"
124                        )));
125                    }
126                    Err(err) => {
127                        return Err(ToolError::Internal(format!(
128                            "failed to load decay job `{id}`: {err}"
129                        )));
130                    }
131                }
132            }
133            None => {
134                // Peek at the next pending job to check its summary method and
135                // record the id for the response.
136                let repo = DecayJobRepo::new(&pool);
137                let pending = repo.list_pending_ready(now).map_err(|err| {
138                    ToolError::Internal(format!("failed to scan pending queue: {err}"))
139                })?;
140                let rec = pending.into_iter().next().ok_or_else(|| {
141                    ToolError::InvalidParams(
142                        "no pending decay jobs whose scheduled_for is in the past".into(),
143                    )
144                })?;
145                if rec.summary_method_wire == "llm_summary" {
146                    return Err(ToolError::InvalidParams(
147                        "next pending job declares summary_method=llm_summary; \
148                         use `cortex decay run --next-pending \
149                         --operator-attestation <PATH>` via the CLI"
150                            .into(),
151                    ));
152                }
153                rec.id
154            }
155        };
156
157        info!(
158            job_id = %preview_id,
159            mode = if target_id.is_some() { "specific" } else { "next_pending" },
160            "cortex_decay_run via MCP"
161        );
162
163        // ── 4. Invoke the runner ──────────────────────────────────────────
164        let runner_result = if let Some(id) = &target_id {
165            run_specific_job(&pool, id, now, &NoopSummaryBackend)
166        } else {
167            match run_next_pending_job(&pool, now, &NoopSummaryBackend) {
168                Ok(Some(_)) => Ok(()),
169                Ok(None) => {
170                    // Queue became empty between the peek and the dispatch —
171                    // treat as a benign race and surface it as InvalidParams.
172                    return Err(ToolError::InvalidParams(
173                        "no pending decay jobs whose scheduled_for is in the past".into(),
174                    ));
175                }
176                Err(err) => Err(err),
177            }
178        };
179
180        // ── 5. Re-read the post-run state ─────────────────────────────────
181        let repo = DecayJobRepo::new(&pool);
182        let post = repo.read(&preview_id).map_err(|err| {
183            ToolError::Internal(format!(
184                "failed to re-read decay job `{preview_id}` after dispatch: {err}"
185            ))
186        })?;
187        let post = post.ok_or_else(|| {
188            ToolError::Internal(format!(
189                "decay job `{preview_id}` disappeared after dispatch (substrate drift)"
190            ))
191        })?;
192
193        // ── 6. Map runner result → outcome ────────────────────────────────
194        let outcome = match runner_result {
195            Ok(()) => "completed",
196            Err(_) => "failed",
197        };
198
199        Ok(json!({
200            "job_id":       post.id.to_string(),
201            "outcome":      outcome,
202            "state_reason": post.state_reason,
203        }))
204    }
205}