use std::path::PathBuf;
use std::str::FromStr;
use chrono::{DateTime, Utc};
use clap::{Args, Subcommand, ValueEnum};
use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
use cortex_llm::{
ClaudeSummaryBackend, NoopSummaryBackend, OllamaConfig, OllamaSummaryBackend,
ReplaySummaryBackend, SummaryBackend,
};
use crate::config::LlmBackend;
use cortex_memory::decay::runner::{
run_next_pending_job_with_attestation, run_specific_job_with_attestation,
};
use cortex_memory::decay::{DecayError, DecayJob, DecayJobKind, DecayJobState, SummaryMethod};
use cortex_store::repo::{DecayJobRecord, DecayJobRepo};
use serde::Serialize;
use serde_json::Value;
use crate::cmd::open_default_store;
use crate::exit::Exit;
use crate::output::{self, Envelope, Outcome};
pub const SCHEDULE_FOR_MUST_BE_PRESENT_OR_FUTURE: &str =
"decay.schedule.scheduled_for_must_be_present_or_future";
pub const RUN_NO_PENDING_JOBS: &str = "decay.run.no_pending_jobs";
pub const CANCEL_TERMINAL_STATE: &str = "decay.cancel.terminal_state";
pub const RUN_OPERATOR_ATTESTATION_REQUIRED_FOR_LLM: &str =
"decay.run.operator_attestation_required_for_llm";
pub const RUN_SUMMARY_BACKEND_FIXTURE_INVALID: &str = "decay.run.summary_backend_fixture_invalid";
pub const RUN_CLAUDE_SUMMARY_BACKEND_NOT_CONFIGURED: &str =
"decay.run.claude_summary_backend_not_configured";
pub const RUN_OLLAMA_SUMMARY_BACKEND_NOT_CONFIGURED: &str =
"decay.ollama_summary.backend_not_configured";
#[derive(Debug, Subcommand)]
pub enum DecaySub {
Schedule(ScheduleArgs),
Run(RunArgs),
List(ListArgs),
Cancel(CancelArgs),
Status(StatusArgs),
}
#[derive(Debug, Args)]
pub struct ScheduleArgs {
#[arg(long, value_enum)]
pub kind: KindFlag,
#[arg(long, value_name = "ID,ID,...")]
pub episode_ids: Option<String>,
#[arg(long, value_name = "ID,ID,...")]
pub memory_ids: Option<String>,
#[arg(long, value_name = "ID")]
pub principle_id: Option<String>,
#[arg(long, value_enum, default_value = "deterministic-concatenate")]
pub summary_method: SummaryMethodFlag,
#[arg(long, value_name = "RFC3339")]
pub scheduled_for: Option<String>,
#[arg(long, value_name = "PATH")]
pub operator_attestation: Option<PathBuf>,
#[arg(long, value_name = "PRINCIPAL", default_value = "operator:cli")]
pub created_by: String,
#[arg(long)]
pub run_immediately: bool,
}
#[derive(Debug, Args)]
pub struct RunArgs {
#[arg(long, value_name = "ID", conflicts_with = "next_pending")]
pub job_id: Option<String>,
#[arg(long, conflicts_with = "job_id")]
pub next_pending: bool,
#[arg(long, value_name = "PATH")]
pub operator_attestation: Option<PathBuf>,
#[arg(long, value_name = "PATH")]
pub summary_backend_fixture: Option<PathBuf>,
#[arg(long, conflicts_with = "ollama_summary")]
pub claude_summary: bool,
#[arg(long, conflicts_with_all = ["claude_summary", "summary_backend_fixture"])]
pub ollama_summary: bool,
}
#[derive(Debug, Args)]
pub struct ListArgs {
#[arg(long, value_enum)]
pub state: Vec<StateFlag>,
}
#[derive(Debug, Args)]
pub struct CancelArgs {
#[arg(value_name = "JOB_ID")]
pub job_id: String,
#[arg(long, value_name = "TEXT")]
pub reason: String,
}
#[derive(Debug, Args)]
pub struct StatusArgs {
#[arg(value_name = "JOB_ID")]
pub job_id: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum KindFlag {
#[value(name = "episode-compression")]
EpisodeCompression,
#[value(name = "candidate-compression")]
CandidateCompression,
#[value(name = "expired-principle-review")]
ExpiredPrincipleReview,
}
impl KindFlag {
#[allow(dead_code)]
fn wire(self) -> &'static str {
match self {
Self::EpisodeCompression => "episode_compression",
Self::CandidateCompression => "candidate_compression",
Self::ExpiredPrincipleReview => "expired_principle_review",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum SummaryMethodFlag {
#[value(name = "deterministic-concatenate")]
DeterministicConcatenate,
Llm,
}
impl SummaryMethodFlag {
fn to_summary_method(self) -> SummaryMethod {
match self {
Self::DeterministicConcatenate => SummaryMethod::DeterministicConcatenate,
Self::Llm => SummaryMethod::LlmSummary {
operator_attestation_required: true,
},
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
pub enum StateFlag {
Pending,
#[value(name = "in-progress")]
InProgress,
Completed,
Failed,
Cancelled,
}
impl StateFlag {
fn wire(self) -> &'static str {
match self {
Self::Pending => "pending",
Self::InProgress => "in_progress",
Self::Completed => "completed",
Self::Failed => "failed",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Debug, Clone, Serialize)]
struct JobView {
id: String,
kind: String,
summary_method: String,
state: String,
source_ids: Value,
state_reason: Option<String>,
result_memory_id: Option<String>,
scheduled_for: String,
created_at: String,
created_by: String,
updated_at: String,
superseded_memory_ids: Vec<String>,
superseded_episode_ids: Vec<String>,
}
impl JobView {
fn from_record(record: &DecayJobRecord) -> Self {
Self {
id: record.id.to_string(),
kind: record.kind_wire.clone(),
summary_method: record.summary_method_wire.clone(),
state: record.state_wire.clone(),
source_ids: record.source_ids_json.clone(),
state_reason: record.state_reason.clone(),
result_memory_id: record.result_memory_id.as_ref().map(ToString::to_string),
scheduled_for: record.scheduled_for.to_rfc3339(),
created_at: record.created_at.to_rfc3339(),
created_by: record.created_by.clone(),
updated_at: record.updated_at.to_rfc3339(),
superseded_memory_ids: Vec::new(),
superseded_episode_ids: Vec::new(),
}
}
}
#[derive(Debug, Serialize)]
struct ScheduleReport {
job_id: String,
kind: String,
summary_method: String,
state: String,
scheduled_for: String,
source_ids: Value,
created_by: String,
}
#[derive(Debug, Serialize)]
struct RunReport {
job_id: String,
kind: String,
summary_method: String,
from_state: String,
to_state: String,
result_memory_id: Option<String>,
state_reason: Option<String>,
}
#[derive(Debug, Serialize)]
struct CancelReport {
job_id: String,
from_state: String,
to_state: String,
reason: String,
}
#[derive(Debug, Serialize)]
struct ListReport {
state_filter: Vec<String>,
job_count: usize,
jobs: Vec<JobView>,
}
#[derive(Debug, Serialize)]
struct RefusalReport {
invariant: &'static str,
reason: String,
}
pub fn run(sub: DecaySub) -> Exit {
match sub {
DecaySub::Schedule(args) => run_schedule(args),
DecaySub::Run(args) => run_run(args),
DecaySub::List(args) => run_list(args),
DecaySub::Cancel(args) => run_cancel(args),
DecaySub::Status(args) => run_status(args),
}
}
fn run_schedule(args: ScheduleArgs) -> Exit {
let now = Utc::now();
let scheduled_for = match parse_scheduled_for(&args.scheduled_for, now) {
Ok(ts) => ts,
Err(exit) => return exit,
};
if scheduled_for < now && !args.run_immediately {
return refuse(
"cortex.decay.schedule",
Exit::PreconditionUnmet,
SCHEDULE_FOR_MUST_BE_PRESENT_OR_FUTURE,
format!(
"--scheduled-for `{}` is in the past (now=`{}`). Pass --run-immediately to opt in.",
scheduled_for.to_rfc3339(),
now.to_rfc3339()
),
);
}
if matches!(args.summary_method, SummaryMethodFlag::Llm) && args.operator_attestation.is_none()
{
return refuse(
"cortex.decay.schedule",
Exit::PreconditionUnmet,
RUN_OPERATOR_ATTESTATION_REQUIRED_FOR_LLM,
"scheduling a --summary-method llm job requires --operator-attestation <PATH>".into(),
);
}
let kind = match build_kind(&args) {
Ok(kind) => kind,
Err(message) => {
eprintln!("cortex decay schedule: {message}");
return Exit::Usage;
}
};
let pool = match open_default_store("decay schedule") {
Ok(pool) => pool,
Err(exit) => return exit,
};
let job_id = DecayJobId::new();
let job = DecayJob {
id: job_id,
kind: kind.clone(),
state: DecayJobState::Pending,
scheduled_for,
created_at: now,
created_by: args.created_by.clone(),
updated_at: now,
};
let record: DecayJobRecord = job.into();
if let Err(err) = DecayJobRepo::new(&pool).insert(&record) {
eprintln!("cortex decay schedule: failed to insert decay job: {err}");
return Exit::Internal;
}
let report = ScheduleReport {
job_id: record.id.to_string(),
kind: record.kind_wire.clone(),
summary_method: record.summary_method_wire.clone(),
state: record.state_wire.clone(),
scheduled_for: record.scheduled_for.to_rfc3339(),
source_ids: record.source_ids_json.clone(),
created_by: record.created_by.clone(),
};
if output::json_enabled() {
let envelope = Envelope::new("cortex.decay.schedule", Exit::Ok, report);
output::emit(&envelope, Exit::Ok)
} else {
println!("cortex decay schedule: ok");
println!("job_id={}", report.job_id);
println!("kind={}", report.kind);
println!("summary_method={}", report.summary_method);
println!("state={}", report.state);
println!("scheduled_for={}", report.scheduled_for);
Exit::Ok
}
}
fn parse_scheduled_for(value: &Option<String>, now: DateTime<Utc>) -> Result<DateTime<Utc>, Exit> {
let Some(raw) = value else { return Ok(now) };
let trimmed = raw.trim();
if trimmed.is_empty() {
return Ok(now);
}
match DateTime::parse_from_rfc3339(trimmed) {
Ok(ts) => Ok(ts.with_timezone(&Utc)),
Err(err) => {
eprintln!(
"cortex decay schedule: --scheduled-for `{trimmed}` is not a valid RFC3339 timestamp: {err}"
);
Err(Exit::Usage)
}
}
}
fn build_kind(args: &ScheduleArgs) -> Result<DecayJobKind, String> {
let summary_method = args.summary_method.to_summary_method();
match args.kind {
KindFlag::EpisodeCompression => {
if args.memory_ids.is_some() || args.principle_id.is_some() {
return Err(
"--kind episode-compression takes --episode-ids, not --memory-ids or --principle-id"
.into(),
);
}
let ids = parse_typed_id_list::<EpisodeId>(&args.episode_ids, "--episode-ids")?;
Ok(DecayJobKind::EpisodeCompression {
source_episode_ids: ids,
summary_method,
})
}
KindFlag::CandidateCompression => {
if args.episode_ids.is_some() || args.principle_id.is_some() {
return Err(
"--kind candidate-compression takes --memory-ids, not --episode-ids or --principle-id"
.into(),
);
}
let ids = parse_typed_id_list::<MemoryId>(&args.memory_ids, "--memory-ids")?;
Ok(DecayJobKind::CandidateCompression {
source_memory_ids: ids,
summary_method,
})
}
KindFlag::ExpiredPrincipleReview => {
if args.episode_ids.is_some() || args.memory_ids.is_some() {
return Err(
"--kind expired-principle-review takes --principle-id, not --episode-ids or --memory-ids"
.into(),
);
}
if matches!(args.summary_method, SummaryMethodFlag::Llm) {
return Err(
"--kind expired-principle-review does not carry a summary method; drop --summary-method llm"
.into(),
);
}
let raw = args
.principle_id
.as_deref()
.map(str::trim)
.filter(|s| !s.is_empty())
.ok_or_else(|| {
"--principle-id is required for --kind expired-principle-review".to_string()
})?;
let principle_id = raw.parse::<PrincipleId>().map_err(|err| {
format!("--principle-id `{raw}` is not a valid principle id: {err}")
})?;
Ok(DecayJobKind::ExpiredPrincipleReview { principle_id })
}
}
}
fn parse_typed_id_list<T: FromStr>(value: &Option<String>, flag: &str) -> Result<Vec<T>, String>
where
T::Err: std::fmt::Display,
{
let Some(raw) = value.as_deref() else {
return Err(format!("{flag} is required (comma-separated identifiers)"));
};
let parts: Vec<&str> = raw
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.collect();
if parts.is_empty() {
return Err(format!("{flag} must contain at least one identifier"));
}
let mut out = Vec::with_capacity(parts.len());
for part in parts {
let id = part
.parse::<T>()
.map_err(|err| format!("{flag} entry `{part}` is not a valid id: {err}"))?;
out.push(id);
}
Ok(out)
}
fn run_run(args: RunArgs) -> Exit {
if args.job_id.is_none() && !args.next_pending {
eprintln!("cortex decay run: pass either --job-id <ID> or --next-pending; both omitted");
return Exit::Usage;
}
let pool = match open_default_store("decay run") {
Ok(pool) => pool,
Err(exit) => return exit,
};
let repo = DecayJobRepo::new(&pool);
let target_id: Option<DecayJobId> = match args.job_id.as_deref() {
Some(raw) => match raw.parse::<DecayJobId>() {
Ok(id) => Some(id),
Err(err) => {
eprintln!("cortex decay run: job id `{raw}` is not a valid decay job id: {err}");
return Exit::Usage;
}
},
None => None,
};
let now = Utc::now();
let preview = match target_id.as_ref() {
Some(id) => match repo.read(id) {
Ok(Some(rec)) => Some(rec),
Ok(None) => {
eprintln!("cortex decay run: job `{id}` not found");
return Exit::PreconditionUnmet;
}
Err(err) => {
eprintln!("cortex decay run: failed to load job `{id}`: {err}");
return Exit::Internal;
}
},
None => match repo.list_pending_ready(now) {
Ok(rows) => match rows.into_iter().next() {
Some(row) => Some(row),
None => {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_NO_PENDING_JOBS,
"no pending decay jobs whose scheduled_for is in the past".into(),
);
}
},
Err(err) => {
eprintln!("cortex decay run: failed to scan pending queue: {err}");
return Exit::Internal;
}
},
};
let preview = preview.expect("preview row resolved above");
let preview_id = preview.id;
if preview.summary_method_wire == "llm_summary" && args.operator_attestation.is_none() {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_OPERATOR_ATTESTATION_REQUIRED_FOR_LLM,
format!(
"job `{preview_id}` declares summary_method=llm_summary; pass --operator-attestation <PATH>",
),
);
}
let prior_state_wire = preview.state_wire.clone();
enum BackendStorage {
Replay(ReplaySummaryBackend),
Claude(ClaudeSummaryBackend),
Ollama(OllamaSummaryBackend),
Noop,
}
impl std::fmt::Debug for BackendStorage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BackendStorage::Replay(_) => write!(f, "BackendStorage::Replay"),
BackendStorage::Claude(_) => write!(f, "BackendStorage::Claude"),
BackendStorage::Ollama(_) => write!(f, "BackendStorage::Ollama"),
BackendStorage::Noop => write!(f, "BackendStorage::Noop"),
}
}
}
impl SummaryBackend for BackendStorage {
fn summarize(
&self,
request: &cortex_llm::SummaryRequest,
) -> Result<cortex_llm::SummaryResponse, cortex_llm::SummaryError> {
match self {
BackendStorage::Replay(b) => b.summarize(request),
BackendStorage::Claude(b) => b.summarize(request),
BackendStorage::Ollama(b) => b.summarize(request),
BackendStorage::Noop => NoopSummaryBackend.summarize(request),
}
}
}
let backend_storage: BackendStorage = match &args.summary_backend_fixture {
Some(path) => match ReplaySummaryBackend::from_fixture_file(path) {
Ok(b) => BackendStorage::Replay(b),
Err(err) => {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_SUMMARY_BACKEND_FIXTURE_INVALID,
format!(
"--summary-backend-fixture `{}` could not be loaded: {err}",
path.display()
),
);
}
},
None if args.claude_summary => {
match ClaudeSummaryBackend::new("claude-3-5-sonnet-20241022".into(), None) {
Ok(b) => BackendStorage::Claude(b),
Err(err) => {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_CLAUDE_SUMMARY_BACKEND_NOT_CONFIGURED,
format!("--claude-summary: ClaudeSummaryBackend not configured: {err}"),
);
}
}
}
None if args.ollama_summary => {
match LlmBackend::resolve() {
LlmBackend::Ollama {
endpoint, model, ..
} => {
let config = OllamaConfig::new(endpoint, model);
match OllamaSummaryBackend::new(config) {
Ok(b) => BackendStorage::Ollama(b),
Err(err) => {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_OLLAMA_SUMMARY_BACKEND_NOT_CONFIGURED,
format!(
"--ollama-summary: OllamaSummaryBackend not configured: {err}"
),
);
}
}
}
_ => {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_OLLAMA_SUMMARY_BACKEND_NOT_CONFIGURED,
"--ollama-summary: no Ollama backend configured; set \
`[llm] backend = \"ollama\"` in cortex.toml (or \
CORTEX_LLM_BACKEND=ollama)"
.to_string(),
);
}
}
}
None => BackendStorage::Noop,
};
let summary_backend: &dyn SummaryBackend = &backend_storage;
let attestation_path = args.operator_attestation.as_deref();
let runner_result = if let Some(id) = target_id.as_ref() {
run_specific_job_with_attestation(&pool, id, now, attestation_path, summary_backend)
} else {
match run_next_pending_job_with_attestation(&pool, now, attestation_path, summary_backend) {
Ok(Some(_)) => Ok(()),
Ok(None) => {
return refuse(
"cortex.decay.run",
Exit::PreconditionUnmet,
RUN_NO_PENDING_JOBS,
"no pending decay jobs whose scheduled_for is in the past".into(),
);
}
Err(err) => Err(err),
}
};
let post = match repo.read(&preview_id) {
Ok(Some(rec)) => rec,
Ok(None) => {
eprintln!(
"cortex decay run: job `{preview_id}` disappeared after dispatch (substrate drift)"
);
return Exit::Internal;
}
Err(err) => {
eprintln!("cortex decay run: failed to re-read job `{preview_id}`: {err}");
return Exit::Internal;
}
};
match runner_result {
Ok(()) => {
let exit = Exit::Ok;
let report = RunReport {
job_id: post.id.to_string(),
kind: post.kind_wire.clone(),
summary_method: post.summary_method_wire.clone(),
from_state: prior_state_wire,
to_state: post.state_wire.clone(),
result_memory_id: post.result_memory_id.as_ref().map(ToString::to_string),
state_reason: post.state_reason.clone(),
};
emit_run_envelope(report, exit)
}
Err(err) => {
let invariant = err
.invariant()
.map(str::to_string)
.unwrap_or_else(|| err.to_string());
eprintln!("cortex decay run: {invariant}");
let exit = match err {
DecayError::LlmSummaryRequiresOperatorAttestation
| DecayError::LlmSummaryAttestationRejected(_)
| DecayError::LlmSummaryBackendCallFailed(_) => Exit::PreconditionUnmet,
_ => Exit::IntegrityFailure,
};
let report = RunReport {
job_id: post.id.to_string(),
kind: post.kind_wire.clone(),
summary_method: post.summary_method_wire.clone(),
from_state: prior_state_wire,
to_state: post.state_wire.clone(),
result_memory_id: post.result_memory_id.as_ref().map(ToString::to_string),
state_reason: post.state_reason.clone(),
};
emit_run_envelope(report, exit)
}
}
}
fn emit_run_envelope(report: RunReport, exit: Exit) -> Exit {
if output::json_enabled() {
let envelope = Envelope::new("cortex.decay.run", exit, report);
return output::emit(&envelope, exit);
}
println!("cortex decay run: state={}", report.to_state);
println!("job_id={}", report.job_id);
if let Some(id) = &report.result_memory_id {
println!("result_memory_id={id}");
}
if let Some(reason) = &report.state_reason {
println!("state_reason={reason}");
}
exit
}
fn run_list(args: ListArgs) -> Exit {
let pool = match open_default_store("decay list") {
Ok(pool) => pool,
Err(exit) => return exit,
};
let repo = DecayJobRepo::new(&pool);
let records = if args.state.is_empty() {
let mut all = Vec::new();
for wire in [
StateFlag::Pending.wire(),
StateFlag::InProgress.wire(),
StateFlag::Completed.wire(),
StateFlag::Failed.wire(),
StateFlag::Cancelled.wire(),
] {
match repo.list_by_state(wire) {
Ok(rows) => all.extend(rows),
Err(err) => {
eprintln!("cortex decay list: failed to read state `{wire}`: {err}");
return Exit::Internal;
}
}
}
all
} else {
let mut out = Vec::new();
for state in &args.state {
match repo.list_by_state(state.wire()) {
Ok(rows) => out.extend(rows),
Err(err) => {
eprintln!(
"cortex decay list: failed to read state `{}`: {err}",
state.wire()
);
return Exit::Internal;
}
}
}
out
};
let jobs: Vec<JobView> = records.iter().map(JobView::from_record).collect();
let report = ListReport {
state_filter: args.state.iter().map(|s| s.wire().to_string()).collect(),
job_count: jobs.len(),
jobs,
};
if output::json_enabled() {
let envelope = Envelope::new("cortex.decay.list", Exit::Ok, report);
return output::emit(&envelope, Exit::Ok);
}
println!(
"cortex decay list: jobs={} state_filter={}",
report.job_count,
if report.state_filter.is_empty() {
"<any>".to_string()
} else {
report.state_filter.join(",")
}
);
for job in &report.jobs {
println!(
"job_id={} kind={} state={} scheduled_for={}",
job.id, job.kind, job.state, job.scheduled_for
);
}
Exit::Ok
}
fn run_cancel(args: CancelArgs) -> Exit {
if args.reason.trim().is_empty() {
eprintln!("cortex decay cancel: --reason must be non-empty");
return Exit::Usage;
}
let job_id = match args.job_id.parse::<DecayJobId>() {
Ok(id) => id,
Err(err) => {
eprintln!(
"cortex decay cancel: job id `{}` is not a valid decay job id: {err}",
args.job_id
);
return Exit::Usage;
}
};
let pool = match open_default_store("decay cancel") {
Ok(pool) => pool,
Err(exit) => return exit,
};
let repo = DecayJobRepo::new(&pool);
let record = match repo.read(&job_id) {
Ok(Some(rec)) => rec,
Ok(None) => {
eprintln!("cortex decay cancel: job `{job_id}` not found");
return Exit::PreconditionUnmet;
}
Err(err) => {
eprintln!("cortex decay cancel: failed to load job `{job_id}`: {err}");
return Exit::Internal;
}
};
let job: DecayJob = match record.clone().try_into() {
Ok(job) => job,
Err(err) => {
eprintln!("cortex decay cancel: job `{job_id}` row malformed: {err}");
return Exit::Internal;
}
};
if job.state.is_terminal() {
return refuse(
"cortex.decay.cancel",
Exit::PreconditionUnmet,
CANCEL_TERMINAL_STATE,
format!(
"job `{}` is already in terminal state `{}`; cannot cancel a completed/failed/cancelled job",
record.id, record.state_wire,
),
);
}
let from_state_wire = record.state_wire.clone();
let now = Utc::now();
if let Err(err) = repo.update_state(&record.id, "cancelled", None, None, now) {
eprintln!("cortex decay cancel: failed to transition job `{job_id}`: {err}");
return Exit::Internal;
}
let report = CancelReport {
job_id: record.id.to_string(),
from_state: from_state_wire,
to_state: "cancelled".into(),
reason: args.reason.clone(),
};
if output::json_enabled() {
let envelope = Envelope::new("cortex.decay.cancel", Exit::Ok, report);
return output::emit(&envelope, Exit::Ok);
}
println!("cortex decay cancel: ok");
println!("job_id={}", report.job_id);
println!("from_state={}", report.from_state);
println!("to_state={}", report.to_state);
println!("reason={}", report.reason);
Exit::Ok
}
fn run_status(args: StatusArgs) -> Exit {
let job_id = match args.job_id.parse::<DecayJobId>() {
Ok(id) => id,
Err(err) => {
eprintln!(
"cortex decay status: job id `{}` is not a valid decay job id: {err}",
args.job_id
);
return Exit::Usage;
}
};
let pool = match open_default_store("decay status") {
Ok(pool) => pool,
Err(exit) => return exit,
};
let repo = DecayJobRepo::new(&pool);
let record = match repo.read(&job_id) {
Ok(Some(rec)) => rec,
Ok(None) => {
eprintln!("cortex decay status: job `{job_id}` not found");
return Exit::PreconditionUnmet;
}
Err(err) => {
eprintln!("cortex decay status: failed to load job `{job_id}`: {err}");
return Exit::Internal;
}
};
let mut view = JobView::from_record(&record);
if let Some(result) = record.result_memory_id.as_ref() {
match repo.list_memory_sources_for(result) {
Ok(rows) => {
view.superseded_memory_ids = rows.into_iter().map(|id| id.to_string()).collect();
}
Err(err) => {
eprintln!(
"cortex decay status: failed to list memory sources for `{result}`: {err}"
);
return Exit::Internal;
}
}
match repo.list_episode_sources_for(result) {
Ok(rows) => {
view.superseded_episode_ids = rows.into_iter().map(|id| id.to_string()).collect();
}
Err(err) => {
eprintln!(
"cortex decay status: failed to list episode sources for `{result}`: {err}"
);
return Exit::Internal;
}
}
}
if output::json_enabled() {
let envelope = Envelope::new("cortex.decay.status", Exit::Ok, view);
return output::emit(&envelope, Exit::Ok);
}
println!("cortex decay status: ok");
println!("job_id={}", view.id);
println!("kind={}", view.kind);
println!("summary_method={}", view.summary_method);
println!("state={}", view.state);
println!("scheduled_for={}", view.scheduled_for);
println!("created_at={}", view.created_at);
println!("created_by={}", view.created_by);
println!("updated_at={}", view.updated_at);
if let Some(id) = &view.result_memory_id {
println!("result_memory_id={id}");
}
if let Some(reason) = &view.state_reason {
println!("state_reason={reason}");
}
if !view.superseded_memory_ids.is_empty() {
println!(
"superseded_memory_ids={}",
view.superseded_memory_ids.join(",")
);
}
if !view.superseded_episode_ids.is_empty() {
println!(
"superseded_episode_ids={}",
view.superseded_episode_ids.join(",")
);
}
Exit::Ok
}
fn refuse(command: &'static str, exit: Exit, invariant: &'static str, reason: String) -> Exit {
let bare = command.replace("cortex.", "").replace('.', " ");
eprintln!("cortex {bare}: {invariant}: {reason}");
if output::json_enabled() {
let report = RefusalReport { invariant, reason };
let outcome = Outcome::from_exit(exit);
let envelope = Envelope::new(command, exit, report).with_outcome(outcome);
return output::emit(&envelope, exit);
}
exit
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn kind_flag_wire_matches_substrate_alphabet() {
assert_eq!(KindFlag::EpisodeCompression.wire(), "episode_compression");
assert_eq!(
KindFlag::CandidateCompression.wire(),
"candidate_compression"
);
assert_eq!(
KindFlag::ExpiredPrincipleReview.wire(),
"expired_principle_review"
);
}
#[test]
fn summary_method_flag_round_trips_to_substrate_enum() {
match SummaryMethodFlag::DeterministicConcatenate.to_summary_method() {
SummaryMethod::DeterministicConcatenate => {}
other => panic!("expected deterministic, got {other:?}"),
}
match SummaryMethodFlag::Llm.to_summary_method() {
SummaryMethod::LlmSummary {
operator_attestation_required,
} => {
assert!(operator_attestation_required);
}
other => panic!("expected llm, got {other:?}"),
}
}
#[test]
fn build_kind_episode_compression_requires_episode_ids() {
let args = sample_args(KindFlag::EpisodeCompression, None, None, None);
let err = build_kind(&args).unwrap_err();
assert!(err.contains("--episode-ids"), "got: {err}");
}
#[test]
fn build_kind_candidate_compression_rejects_principle_id() {
let mut args = sample_args(
KindFlag::CandidateCompression,
None,
Some(format!("{}", MemoryId::new())),
None,
);
args.principle_id = Some(format!("{}", PrincipleId::new()));
let err = build_kind(&args).unwrap_err();
assert!(err.contains("candidate-compression"), "got: {err}");
}
#[test]
fn build_kind_expired_principle_review_requires_principle_id() {
let args = sample_args(KindFlag::ExpiredPrincipleReview, None, None, None);
let err = build_kind(&args).unwrap_err();
assert!(err.contains("--principle-id"), "got: {err}");
}
#[test]
fn build_kind_expired_principle_review_refuses_llm_summary() {
let raw_id = format!("{}", PrincipleId::new());
let mut args = sample_args(KindFlag::ExpiredPrincipleReview, None, None, Some(raw_id));
args.summary_method = SummaryMethodFlag::Llm;
let err = build_kind(&args).unwrap_err();
assert!(err.contains("expired-principle-review"), "got: {err}");
}
fn sample_args(
kind: KindFlag,
episodes: Option<String>,
memories: Option<String>,
principle: Option<String>,
) -> ScheduleArgs {
ScheduleArgs {
kind,
episode_ids: episodes,
memory_ids: memories,
principle_id: principle,
summary_method: SummaryMethodFlag::DeterministicConcatenate,
scheduled_for: None,
operator_attestation: None,
created_by: "operator:test".into(),
run_immediately: false,
}
}
}