use std::sync::{Arc, Mutex};
use chrono::Utc;
use cortex_core::DecayJobId;
use cortex_llm::NoopSummaryBackend;
use cortex_memory::decay::runner::{run_next_pending_job, run_specific_job};
use cortex_store::repo::DecayJobRepo;
use cortex_store::Pool;
use serde_json::{json, Value};
use tracing::info;
use crate::tool_handler::{GateId, ToolError, ToolHandler};
#[derive(Debug)]
pub struct CortexDecayRunTool {
pool: Arc<Mutex<Pool>>,
}
impl CortexDecayRunTool {
#[must_use]
pub fn new(pool: Arc<Mutex<Pool>>) -> Self {
Self { pool }
}
}
impl ToolHandler for CortexDecayRunTool {
fn name(&self) -> &'static str {
"cortex_decay_run"
}
fn gate_set(&self) -> &'static [GateId] {
&[GateId::SessionWrite]
}
fn call(&self, params: Value) -> Result<Value, ToolError> {
let job_id_str = params["job_id"].as_str().filter(|s| !s.is_empty());
let next_pending = params["next_pending"].as_bool().unwrap_or(false);
if job_id_str.is_none() && !next_pending {
return Err(ToolError::InvalidParams(
"one of job_id or next_pending must be supplied".into(),
));
}
if job_id_str.is_some() && next_pending {
return Err(ToolError::InvalidParams(
"job_id and next_pending are mutually exclusive".into(),
));
}
let target_id: Option<DecayJobId> = match job_id_str {
None => None,
Some(raw) => {
let id = raw.parse::<DecayJobId>().map_err(|err| {
ToolError::InvalidParams(format!(
"job_id `{raw}` is not a valid decay job id: {err}"
))
})?;
Some(id)
}
};
let now = Utc::now();
let pool = self
.pool
.lock()
.map_err(|err| ToolError::Internal(format!("pool lock poisoned: {err}")))?;
let preview_id: DecayJobId = match &target_id {
Some(id) => {
let repo = DecayJobRepo::new(&pool);
match repo.read(id) {
Ok(Some(rec)) => {
if rec.summary_method_wire == "llm_summary" {
return Err(ToolError::InvalidParams(format!(
"job `{id}` declares summary_method=llm_summary; \
use `cortex decay run --job-id {id} \
--operator-attestation <PATH>` via the CLI"
)));
}
rec.id
}
Ok(None) => {
return Err(ToolError::InvalidParams(format!(
"decay job `{id}` not found"
)));
}
Err(err) => {
return Err(ToolError::Internal(format!(
"failed to load decay job `{id}`: {err}"
)));
}
}
}
None => {
let repo = DecayJobRepo::new(&pool);
let pending = repo.list_pending_ready(now).map_err(|err| {
ToolError::Internal(format!("failed to scan pending queue: {err}"))
})?;
let rec = pending.into_iter().next().ok_or_else(|| {
ToolError::InvalidParams(
"no pending decay jobs whose scheduled_for is in the past".into(),
)
})?;
if rec.summary_method_wire == "llm_summary" {
return Err(ToolError::InvalidParams(
"next pending job declares summary_method=llm_summary; \
use `cortex decay run --next-pending \
--operator-attestation <PATH>` via the CLI"
.into(),
));
}
rec.id
}
};
info!(
job_id = %preview_id,
mode = if target_id.is_some() { "specific" } else { "next_pending" },
"cortex_decay_run via MCP"
);
let runner_result = if let Some(id) = &target_id {
run_specific_job(&pool, id, now, &NoopSummaryBackend)
} else {
match run_next_pending_job(&pool, now, &NoopSummaryBackend) {
Ok(Some(_)) => Ok(()),
Ok(None) => {
return Err(ToolError::InvalidParams(
"no pending decay jobs whose scheduled_for is in the past".into(),
));
}
Err(err) => Err(err),
}
};
let repo = DecayJobRepo::new(&pool);
let post = repo.read(&preview_id).map_err(|err| {
ToolError::Internal(format!(
"failed to re-read decay job `{preview_id}` after dispatch: {err}"
))
})?;
let post = post.ok_or_else(|| {
ToolError::Internal(format!(
"decay job `{preview_id}` disappeared after dispatch (substrate drift)"
))
})?;
let outcome = match runner_result {
Ok(()) => "completed",
Err(_) => "failed",
};
Ok(json!({
"job_id": post.id.to_string(),
"outcome": outcome,
"state_reason": post.state_reason,
}))
}
}