cortex_mcp/tools/
decay_schedule.rs1use std::sync::{Arc, Mutex};
35
36use chrono::{DateTime, Utc};
37use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
38use cortex_memory::decay::{
39 DecayJob, DecayJobKind, DecayJobState, SummaryMethod, SUMMARY_METHOD_NONE_WIRE,
40};
41use cortex_store::repo::{DecayJobRecord, DecayJobRepo};
42use cortex_store::Pool;
43use serde_json::{json, Value};
44use tracing::info;
45
46use crate::tool_handler::{GateId, ToolError, ToolHandler};
47
48const VALID_KINDS: &[&str] = &[
50 "episode_compression",
51 "candidate_compression",
52 "expired_principle_review",
53];
54
55const VALID_SUMMARY_METHODS: &[&str] = &["deterministic_concatenate", "llm_summary"];
57
58#[derive(Debug)]
60pub struct CortexDecayScheduleTool {
61 pool: Arc<Mutex<Pool>>,
62}
63
64impl CortexDecayScheduleTool {
65 #[must_use]
67 pub fn new(pool: Arc<Mutex<Pool>>) -> Self {
68 Self { pool }
69 }
70}
71
72impl ToolHandler for CortexDecayScheduleTool {
73 fn name(&self) -> &'static str {
74 "cortex_decay_schedule"
75 }
76
77 fn gate_set(&self) -> &'static [GateId] {
78 &[GateId::SessionWrite]
79 }
80
81 fn call(&self, params: Value) -> Result<Value, ToolError> {
82 let kind_str = params["kind"]
84 .as_str()
85 .filter(|s| !s.is_empty())
86 .ok_or_else(|| ToolError::InvalidParams("kind is required".into()))?;
87
88 if !VALID_KINDS.contains(&kind_str) {
89 return Err(ToolError::InvalidParams(format!(
90 "kind must be one of {VALID_KINDS:?}, got `{kind_str}`"
91 )));
92 }
93
94 let summary_method_str = params["summary_method"]
96 .as_str()
97 .filter(|s| !s.is_empty())
98 .ok_or_else(|| ToolError::InvalidParams("summary_method is required".into()))?;
99
100 if !VALID_SUMMARY_METHODS.contains(&summary_method_str) {
101 return Err(ToolError::InvalidParams(format!(
102 "summary_method must be one of {VALID_SUMMARY_METHODS:?}, got `{summary_method_str}`"
103 )));
104 }
105
106 if summary_method_str == "llm_summary" {
109 return Err(ToolError::InvalidParams(
110 "summary_method `llm_summary` is not supported via MCP; \
111 use `cortex decay schedule --summary-method llm` via the CLI \
112 to supply an operator attestation"
113 .into(),
114 ));
115 }
116
117 let source_ids_arr = params["source_ids"]
119 .as_array()
120 .ok_or_else(|| ToolError::InvalidParams("source_ids must be a JSON array".into()))?;
121
122 let now = Utc::now();
124 let scheduled_for = match params["scheduled_for"].as_str() {
125 None | Some("") => now,
126 Some(raw) => DateTime::parse_from_rfc3339(raw)
127 .map(|ts| ts.with_timezone(&Utc))
128 .map_err(|err| {
129 ToolError::InvalidParams(format!(
130 "scheduled_for `{raw}` is not a valid RFC3339 timestamp: {err}"
131 ))
132 })?,
133 };
134
135 let summary_method = SummaryMethod::DeterministicConcatenate;
137
138 let kind = match kind_str {
139 "episode_compression" => {
140 let ids = parse_source_ids::<EpisodeId>(source_ids_arr, "source_ids")?;
141 DecayJobKind::EpisodeCompression {
142 source_episode_ids: ids,
143 summary_method,
144 }
145 }
146 "candidate_compression" => {
147 let ids = parse_source_ids::<MemoryId>(source_ids_arr, "source_ids")?;
148 DecayJobKind::CandidateCompression {
149 source_memory_ids: ids,
150 summary_method,
151 }
152 }
153 "expired_principle_review" => {
154 if source_ids_arr.len() != 1 {
155 return Err(ToolError::InvalidParams(format!(
156 "expired_principle_review requires exactly one principle id in source_ids, \
157 got {}",
158 source_ids_arr.len()
159 )));
160 }
161 let raw = source_ids_arr[0].as_str().ok_or_else(|| {
162 ToolError::InvalidParams("source_ids entries must be strings".into())
163 })?;
164 let principle_id = raw.parse::<PrincipleId>().map_err(|err| {
165 ToolError::InvalidParams(format!(
166 "source_ids[0] `{raw}` is not a valid principle id: {err}"
167 ))
168 })?;
169 DecayJobKind::ExpiredPrincipleReview { principle_id }
170 }
171 _ => unreachable!("kind validated against VALID_KINDS above"),
172 };
173
174 let job_id = DecayJobId::new();
176 let job = DecayJob {
177 id: job_id,
178 kind,
179 state: DecayJobState::Pending,
180 scheduled_for,
181 created_at: now,
182 created_by: "operator:mcp".to_owned(),
183 updated_at: now,
184 };
185
186 let record: DecayJobRecord = job.into();
187
188 info!(
189 job_id = %record.id,
190 kind = %record.kind_wire,
191 summary_method = %record.summary_method_wire,
192 scheduled_for = %record.scheduled_for.to_rfc3339(),
193 "cortex_decay_schedule via MCP"
194 );
195
196 let pool = self
197 .pool
198 .lock()
199 .map_err(|err| ToolError::Internal(format!("pool lock poisoned: {err}")))?;
200 DecayJobRepo::new(&pool)
201 .insert(&record)
202 .map_err(|err| ToolError::Internal(format!("failed to insert decay job: {err}")))?;
203
204 Ok(json!({
206 "job_id": record.id.to_string(),
207 "kind": record.kind_wire,
208 "state": "pending",
209 "scheduled_for": record.scheduled_for.to_rfc3339(),
210 }))
211 }
212}
213
214fn parse_source_ids<T>(arr: &[Value], field: &str) -> Result<Vec<T>, ToolError>
216where
217 T: std::str::FromStr,
218 T::Err: std::fmt::Display,
219{
220 if arr.is_empty() {
221 return Err(ToolError::InvalidParams(format!(
222 "{field} must contain at least one id"
223 )));
224 }
225 arr.iter()
226 .enumerate()
227 .map(|(i, v)| {
228 let raw = v.as_str().ok_or_else(|| {
229 ToolError::InvalidParams(format!("{field}[{i}] must be a string"))
230 })?;
231 raw.parse::<T>().map_err(|err| {
232 ToolError::InvalidParams(format!("{field}[{i}] `{raw}` is not a valid id: {err}"))
233 })
234 })
235 .collect()
236}
237
238#[allow(dead_code)]
244const _NONE_WIRE_USED_BY_FROM_IMPL: &str = SUMMARY_METHOD_NONE_WIRE;