Skip to main content

cortex_mcp/tools/
decay_schedule.rs

1//! `cortex_decay_schedule` MCP tool handler.
2//!
3//! Enqueues a new decay job. Mirrors the write path of
4//! `cortex decay schedule` (`crates/cortex-cli/src/cmd/decay.rs`
5//! `run_schedule` fn) via [`DecayJobRepo::insert`].
6//!
7//! Gate: [`GateId::SessionWrite`] — supervised tier, logs at info.
8//!
9//! Schema:
10//! ```text
11//! cortex_decay_schedule(
12//!   kind:           "episode_compression" | "candidate_compression" | "expired_principle_review",
13//!   summary_method: "deterministic_concatenate" | "llm_summary",
14//!   source_ids:     [string],
15//!   scheduled_for?: string,   // RFC3339; defaults to now
16//! ) -> {
17//!   job_id:        string,
18//!   kind:          string,
19//!   state:         "pending",
20//!   scheduled_for: string,
21//! }
22//! ```
23//!
24//! The `source_ids` array is interpreted according to `kind`:
25//! - `episode_compression` — episode identifiers (`ep_...`).
26//! - `candidate_compression` — candidate memory identifiers (`mem_...`).
27//! - `expired_principle_review` — exactly one principle identifier (`prn_...`).
28//!
29//! LLM-summary jobs (`summary_method = "llm_summary"`) are refused at the MCP
30//! boundary: the attestation path required by the runner cannot be supplied via
31//! the JSON-RPC transport. Use `cortex decay schedule --summary-method llm` via
32//! the CLI for those jobs.
33
34use std::sync::{Arc, Mutex};
35
36use chrono::{DateTime, Utc};
37use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
38use cortex_memory::decay::{
39    DecayJob, DecayJobKind, DecayJobState, SummaryMethod, SUMMARY_METHOD_NONE_WIRE,
40};
41use cortex_store::repo::{DecayJobRecord, DecayJobRepo};
42use cortex_store::Pool;
43use serde_json::{json, Value};
44use tracing::info;
45
46use crate::tool_handler::{GateId, ToolError, ToolHandler};
47
48/// Valid `kind` wire tokens accepted by the tool.
49const VALID_KINDS: &[&str] = &[
50    "episode_compression",
51    "candidate_compression",
52    "expired_principle_review",
53];
54
55/// Valid `summary_method` wire tokens accepted by the tool.
56const VALID_SUMMARY_METHODS: &[&str] = &["deterministic_concatenate", "llm_summary"];
57
58/// MCP tool: `cortex_decay_schedule`.
59#[derive(Debug)]
60pub struct CortexDecayScheduleTool {
61    pool: Arc<Mutex<Pool>>,
62}
63
64impl CortexDecayScheduleTool {
65    /// Construct the tool over a shared, mutex-guarded store connection.
66    #[must_use]
67    pub fn new(pool: Arc<Mutex<Pool>>) -> Self {
68        Self { pool }
69    }
70}
71
72impl ToolHandler for CortexDecayScheduleTool {
73    fn name(&self) -> &'static str {
74        "cortex_decay_schedule"
75    }
76
77    fn gate_set(&self) -> &'static [GateId] {
78        &[GateId::SessionWrite]
79    }
80
81    fn call(&self, params: Value) -> Result<Value, ToolError> {
82        // ── 1. Extract and validate `kind` ────────────────────────────────
83        let kind_str = params["kind"]
84            .as_str()
85            .filter(|s| !s.is_empty())
86            .ok_or_else(|| ToolError::InvalidParams("kind is required".into()))?;
87
88        if !VALID_KINDS.contains(&kind_str) {
89            return Err(ToolError::InvalidParams(format!(
90                "kind must be one of {VALID_KINDS:?}, got `{kind_str}`"
91            )));
92        }
93
94        // ── 2. Extract and validate `summary_method` ──────────────────────
95        let summary_method_str = params["summary_method"]
96            .as_str()
97            .filter(|s| !s.is_empty())
98            .ok_or_else(|| ToolError::InvalidParams("summary_method is required".into()))?;
99
100        if !VALID_SUMMARY_METHODS.contains(&summary_method_str) {
101            return Err(ToolError::InvalidParams(format!(
102                "summary_method must be one of {VALID_SUMMARY_METHODS:?}, got `{summary_method_str}`"
103            )));
104        }
105
106        // LLM-summary jobs require an operator attestation file path that
107        // cannot be supplied via the MCP transport. Refuse fail-closed.
108        if summary_method_str == "llm_summary" {
109            return Err(ToolError::InvalidParams(
110                "summary_method `llm_summary` is not supported via MCP; \
111                 use `cortex decay schedule --summary-method llm` via the CLI \
112                 to supply an operator attestation"
113                    .into(),
114            ));
115        }
116
117        // ── 3. Extract and validate `source_ids` ─────────────────────────
118        let source_ids_arr = params["source_ids"]
119            .as_array()
120            .ok_or_else(|| ToolError::InvalidParams("source_ids must be a JSON array".into()))?;
121
122        // ── 4. Parse `scheduled_for` ──────────────────────────────────────
123        let now = Utc::now();
124        let scheduled_for = match params["scheduled_for"].as_str() {
125            None | Some("") => now,
126            Some(raw) => DateTime::parse_from_rfc3339(raw)
127                .map(|ts| ts.with_timezone(&Utc))
128                .map_err(|err| {
129                    ToolError::InvalidParams(format!(
130                        "scheduled_for `{raw}` is not a valid RFC3339 timestamp: {err}"
131                    ))
132                })?,
133        };
134
135        // ── 5. Build the typed kind ───────────────────────────────────────
136        let summary_method = SummaryMethod::DeterministicConcatenate;
137
138        let kind = match kind_str {
139            "episode_compression" => {
140                let ids = parse_source_ids::<EpisodeId>(source_ids_arr, "source_ids")?;
141                DecayJobKind::EpisodeCompression {
142                    source_episode_ids: ids,
143                    summary_method,
144                }
145            }
146            "candidate_compression" => {
147                let ids = parse_source_ids::<MemoryId>(source_ids_arr, "source_ids")?;
148                DecayJobKind::CandidateCompression {
149                    source_memory_ids: ids,
150                    summary_method,
151                }
152            }
153            "expired_principle_review" => {
154                if source_ids_arr.len() != 1 {
155                    return Err(ToolError::InvalidParams(format!(
156                        "expired_principle_review requires exactly one principle id in source_ids, \
157                         got {}",
158                        source_ids_arr.len()
159                    )));
160                }
161                let raw = source_ids_arr[0].as_str().ok_or_else(|| {
162                    ToolError::InvalidParams("source_ids entries must be strings".into())
163                })?;
164                let principle_id = raw.parse::<PrincipleId>().map_err(|err| {
165                    ToolError::InvalidParams(format!(
166                        "source_ids[0] `{raw}` is not a valid principle id: {err}"
167                    ))
168                })?;
169                DecayJobKind::ExpiredPrincipleReview { principle_id }
170            }
171            _ => unreachable!("kind validated against VALID_KINDS above"),
172        };
173
174        // ── 6. Build the record and insert ────────────────────────────────
175        let job_id = DecayJobId::new();
176        let job = DecayJob {
177            id: job_id,
178            kind,
179            state: DecayJobState::Pending,
180            scheduled_for,
181            created_at: now,
182            created_by: "operator:mcp".to_owned(),
183            updated_at: now,
184        };
185
186        let record: DecayJobRecord = job.into();
187
188        info!(
189            job_id = %record.id,
190            kind = %record.kind_wire,
191            summary_method = %record.summary_method_wire,
192            scheduled_for = %record.scheduled_for.to_rfc3339(),
193            "cortex_decay_schedule via MCP"
194        );
195
196        let pool = self
197            .pool
198            .lock()
199            .map_err(|err| ToolError::Internal(format!("pool lock poisoned: {err}")))?;
200        DecayJobRepo::new(&pool)
201            .insert(&record)
202            .map_err(|err| ToolError::Internal(format!("failed to insert decay job: {err}")))?;
203
204        // ── 7. Return the stable schema ───────────────────────────────────
205        Ok(json!({
206            "job_id":        record.id.to_string(),
207            "kind":          record.kind_wire,
208            "state":         "pending",
209            "scheduled_for": record.scheduled_for.to_rfc3339(),
210        }))
211    }
212}
213
214/// Parse a JSON array of strings into typed ids.
215fn parse_source_ids<T>(arr: &[Value], field: &str) -> Result<Vec<T>, ToolError>
216where
217    T: std::str::FromStr,
218    T::Err: std::fmt::Display,
219{
220    if arr.is_empty() {
221        return Err(ToolError::InvalidParams(format!(
222            "{field} must contain at least one id"
223        )));
224    }
225    arr.iter()
226        .enumerate()
227        .map(|(i, v)| {
228            let raw = v.as_str().ok_or_else(|| {
229                ToolError::InvalidParams(format!("{field}[{i}] must be a string"))
230            })?;
231            raw.parse::<T>().map_err(|err| {
232                ToolError::InvalidParams(format!("{field}[{i}] `{raw}` is not a valid id: {err}"))
233            })
234        })
235        .collect()
236}
237
238/// Suppress unused-import warning: `SUMMARY_METHOD_NONE_WIRE` is brought in
239/// via the `use` above to keep the import list consistent with `decay.rs` in
240/// the CLI, but the schedule tool never writes `"none"` directly (the
241/// `DecayJob -> DecayJobRecord` `From` impl handles that for
242/// `ExpiredPrincipleReview`).
243#[allow(dead_code)]
244const _NONE_WIRE_USED_BY_FROM_IMPL: &str = SUMMARY_METHOD_NONE_WIRE;