use std::path::Path;
use chrono::{DateTime, Utc};
use cortex_core::{DecayJobId, EpisodeId, MemoryId};
use cortex_llm::SummaryBackend;
use cortex_store::repo::{DecayJobRecord, DecayJobRepo};
use cortex_store::Pool;
use serde_json::Value;
use super::{
compress, summary, DecayError, DecayJob, DecayJobKind, DecayJobState, DecayResult,
SummaryMethod,
};
pub fn run_next_pending_job(
pool: &Pool,
now: DateTime<Utc>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<Option<DecayJobId>> {
run_next_pending_job_with_attestation(pool, now, None, summary_backend)
}
pub fn run_next_pending_job_with_attestation(
pool: &Pool,
now: DateTime<Utc>,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<Option<DecayJobId>> {
let repo = DecayJobRepo::new(pool);
let pending = repo.list_pending_ready(now)?;
let Some(record) = pending.into_iter().next() else {
log_stage("idle", None, "no pending decay jobs");
return Ok(None);
};
let id = record.id;
run_loaded_record(pool, record, now, operator_attestation, summary_backend)?;
Ok(Some(id))
}
pub fn run_specific_job(
pool: &Pool,
id: &DecayJobId,
now: DateTime<Utc>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<()> {
run_specific_job_with_attestation(pool, id, now, None, summary_backend)
}
pub fn run_specific_job_with_attestation(
pool: &Pool,
id: &DecayJobId,
now: DateTime<Utc>,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<()> {
let repo = DecayJobRepo::new(pool);
let record = repo
.read(id)?
.ok_or_else(|| DecayError::Validation(format!("decay job {id} not found")))?;
let job: DecayJob =
record
.clone()
.try_into()
.map_err(|err: super::DecayJobConversionError| {
DecayError::Validation(format!("decay job {id} row malformed: {err}"))
})?;
match job.state {
DecayJobState::Completed { .. }
| DecayJobState::Failed { .. }
| DecayJobState::Cancelled => {
log_stage(
"skip_terminal",
Some(id),
&format!("job is already {}; no-op", job.state.state_wire()),
);
Ok(())
}
DecayJobState::InProgress => Err(DecayError::Validation(format!(
"decay job {id} is already in_progress; refusing to re-claim from another runner",
))),
DecayJobState::Pending => {
run_loaded_record(pool, record, now, operator_attestation, summary_backend)
}
}
}
fn run_loaded_record(
pool: &Pool,
record: DecayJobRecord,
now: DateTime<Utc>,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<()> {
let job: DecayJob =
record
.clone()
.try_into()
.map_err(|err: super::DecayJobConversionError| {
DecayError::Validation(format!("decay job {} row malformed: {err}", record.id))
})?;
let repo = DecayJobRepo::new(pool);
log_stage(
"claim",
Some(&job.id),
"transitioning pending -> in_progress",
);
repo.update_state(&job.id, "in_progress", None, None, now)?;
let dispatch_result = match &job.kind {
DecayJobKind::CandidateCompression {
source_memory_ids,
summary_method,
} => dispatch_candidate_compression(
pool,
&record,
&job.id,
source_memory_ids,
summary_method,
&job.created_by,
operator_attestation,
summary_backend,
),
DecayJobKind::EpisodeCompression {
source_episode_ids,
summary_method,
} => dispatch_episode_compression(
pool,
&record,
&job.id,
source_episode_ids,
summary_method,
&job.created_by,
operator_attestation,
summary_backend,
),
DecayJobKind::ExpiredPrincipleReview { .. } => {
log_stage(
"dispatch",
Some(&job.id),
"kind=expired_principle_review (no in-process work; ceremony opened separately)",
);
Ok(DispatchOutcome::CompletedWithoutMemory)
}
};
let settle_now = Utc::now();
match dispatch_result {
Ok(DispatchOutcome::CompletedWithMemory(memory_id)) => {
log_stage(
"settle",
Some(&job.id),
&format!("compression ok; produced {memory_id}"),
);
repo.update_state(&job.id, "completed", None, Some(&memory_id), settle_now)?;
log_stage("complete", Some(&job.id), "transitioned to completed");
Ok(())
}
Ok(DispatchOutcome::CompletedWithoutMemory) => {
log_stage(
"settle",
Some(&job.id),
"ceremony opened; no memory produced",
);
repo.update_state(&job.id, "completed", None, None, settle_now)?;
log_stage("complete", Some(&job.id), "transitioned to completed");
Ok(())
}
Err(err) => {
let reason = invariant_or_display(&err);
log_stage(
"settle",
Some(&job.id),
&format!("compression failed: {reason}"),
);
let stable_reason = if reason.trim().is_empty() {
"decay.runner.failure".to_string()
} else {
reason
};
repo.update_state(&job.id, "failed", Some(&stable_reason), None, settle_now)?;
log_stage("complete", Some(&job.id), "transitioned to failed");
Err(err)
}
}
}
enum DispatchOutcome {
CompletedWithMemory(MemoryId),
CompletedWithoutMemory,
}
#[allow(clippy::too_many_arguments)]
fn dispatch_candidate_compression(
pool: &Pool,
record: &DecayJobRecord,
job_id: &DecayJobId,
source_memory_ids: &[MemoryId],
summary_method: &SummaryMethod,
operator: &str,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<DispatchOutcome> {
match summary_method {
SummaryMethod::DeterministicConcatenate => {
log_stage(
"dispatch",
Some(job_id),
"kind=candidate_compression method=deterministic_concatenate",
);
let produced = compress::compress_candidate_memories_with_job(
pool,
source_memory_ids,
operator,
Some(job_id),
)?;
Ok(DispatchOutcome::CompletedWithMemory(produced))
}
SummaryMethod::LlmSummary { .. } => {
log_stage(
"dispatch",
Some(job_id),
"kind=candidate_compression method=llm_summary (operator-fired)",
);
let produced =
summary::run_llm_summary_job(pool, record, operator_attestation, summary_backend)?;
Ok(DispatchOutcome::CompletedWithMemory(produced))
}
}
}
#[allow(clippy::too_many_arguments)]
fn dispatch_episode_compression(
pool: &Pool,
record: &DecayJobRecord,
job_id: &DecayJobId,
source_episode_ids: &[EpisodeId],
summary_method: &SummaryMethod,
operator: &str,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<DispatchOutcome> {
match summary_method {
SummaryMethod::DeterministicConcatenate => {
log_stage(
"dispatch",
Some(job_id),
"kind=episode_compression method=deterministic_concatenate",
);
let produced = compress::compress_episodes_with_job(
pool,
source_episode_ids,
operator,
Some(job_id),
)?;
Ok(DispatchOutcome::CompletedWithMemory(produced))
}
SummaryMethod::LlmSummary { .. } => {
log_stage(
"dispatch",
Some(job_id),
"kind=episode_compression method=llm_summary (operator-fired)",
);
let produced =
summary::run_llm_summary_job(pool, record, operator_attestation, summary_backend)?;
Ok(DispatchOutcome::CompletedWithMemory(produced))
}
}
}
fn invariant_or_display(err: &DecayError) -> String {
err.invariant()
.map(str::to_string)
.unwrap_or_else(|| err.to_string())
}
fn log_stage(stage: &str, job_id: Option<&DecayJobId>, message: &str) {
match job_id {
Some(id) => eprintln!("cortex_memory::decay::runner stage={stage} job={id} {message}"),
None => eprintln!("cortex_memory::decay::runner stage={stage} {message}"),
}
}
#[allow(dead_code)]
fn kind_wire_of(value: &Value) -> Option<&str> {
value.as_str()
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
use cortex_core::MemoryId;
use cortex_llm::NoopSummaryBackend;
use cortex_store::migrate::apply_pending;
use cortex_store::repo::{MemoryCandidate, MemoryRepo};
use rusqlite::Connection;
use serde_json::Value;
fn seed_pool() -> Pool {
let pool = Connection::open_in_memory().expect("open in-memory pool");
apply_pending(&pool).expect("apply migrations");
pool
}
fn at(offset_seconds: i64) -> DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
+ chrono::Duration::seconds(offset_seconds)
}
fn insert_test_memory(pool: &Pool, claim: &str) -> MemoryId {
let id = MemoryId::new();
let candidate = MemoryCandidate {
id,
memory_type: "semantic".into(),
claim: claim.into(),
source_episodes_json: Value::Array(Vec::new()),
source_events_json: Value::Array(vec![Value::String(
"evt_01ARZ3NDEKTSV4RRFFQ69G5FAV".into(),
)]),
domains_json: Value::Array(vec![Value::String("t".into())]),
salience_json: Value::Object(serde_json::Map::new()),
confidence: 0.7,
authority: "candidate".into(),
applies_when_json: Value::Object(serde_json::Map::new()),
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: Utc::now(),
updated_at: Utc::now(),
};
MemoryRepo::new(pool).insert_candidate(&candidate).unwrap();
id
}
fn enqueue_candidate_det_job(pool: &Pool, sources: &[MemoryId]) -> DecayJobId {
let id = DecayJobId::new();
let job = DecayJob {
id,
kind: DecayJobKind::CandidateCompression {
source_memory_ids: sources.to_vec(),
summary_method: SummaryMethod::DeterministicConcatenate,
},
state: DecayJobState::Pending,
scheduled_for: at(0),
created_at: at(0),
created_by: "operator:test".into(),
updated_at: at(0),
};
let record: DecayJobRecord = job.into();
DecayJobRepo::new(pool).insert(&record).unwrap();
id
}
fn enqueue_candidate_llm_job(pool: &Pool, sources: &[MemoryId]) -> DecayJobId {
let id = DecayJobId::new();
let job = DecayJob {
id,
kind: DecayJobKind::CandidateCompression {
source_memory_ids: sources.to_vec(),
summary_method: SummaryMethod::LlmSummary {
operator_attestation_required: true,
},
},
state: DecayJobState::Pending,
scheduled_for: at(0),
created_at: at(0),
created_by: "operator:test".into(),
updated_at: at(0),
};
let record: DecayJobRecord = job.into();
DecayJobRepo::new(pool).insert(&record).unwrap();
id
}
#[test]
fn runner_transitions_pending_to_completed_atomically() {
let pool = seed_pool();
let m1 = insert_test_memory(&pool, "alpha");
let m2 = insert_test_memory(&pool, "beta");
let id = enqueue_candidate_det_job(&pool, &[m1, m2]);
let backend = NoopSummaryBackend;
let ran = run_next_pending_job(&pool, at(60), &backend).expect("run ok");
assert_eq!(ran, Some(id));
let repo = DecayJobRepo::new(&pool);
let record = repo.read(&id).unwrap().unwrap();
assert_eq!(record.state_wire, "completed");
assert!(record.result_memory_id.is_some());
}
#[test]
fn runner_marks_failed_on_inner_error_with_invariant_reason() {
let pool = seed_pool();
let phantom = MemoryId::new();
let id = enqueue_candidate_det_job(&pool, &[phantom]);
let backend = NoopSummaryBackend;
let err =
run_next_pending_job(&pool, at(60), &backend).expect_err("inner error must propagate");
match err {
DecayError::Validation(msg) => {
assert!(
msg.contains(super::super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT),
"msg: {msg}"
);
}
other => panic!("expected Validation, got {other:?}"),
}
let record = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
assert_eq!(record.state_wire, "failed");
let reason = record.state_reason.expect("failure reason persisted");
assert!(!reason.trim().is_empty());
}
#[test]
fn runner_completed_job_is_idempotent_on_re_run() {
let pool = seed_pool();
let m1 = insert_test_memory(&pool, "alpha");
let m2 = insert_test_memory(&pool, "beta");
let id = enqueue_candidate_det_job(&pool, &[m1, m2]);
let backend = NoopSummaryBackend;
run_next_pending_job(&pool, at(60), &backend).expect("first run ok");
let before = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
assert_eq!(before.state_wire, "completed");
run_specific_job(&pool, &id, at(70), &backend).expect("re-run is a no-op");
let after = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
assert_eq!(after, before, "completed job must be untouched on re-run");
}
#[test]
fn runner_idle_when_queue_empty() {
let pool = seed_pool();
let backend = NoopSummaryBackend;
let ran = run_next_pending_job(&pool, at(60), &backend).expect("idle ok");
assert_eq!(ran, None);
}
#[test]
fn runner_refuses_in_progress_re_claim() {
let pool = seed_pool();
let m1 = insert_test_memory(&pool, "alpha");
let id = enqueue_candidate_det_job(&pool, &[m1]);
DecayJobRepo::new(&pool)
.update_state(&id, "in_progress", None, None, at(20))
.unwrap();
let backend = NoopSummaryBackend;
let err = run_specific_job(&pool, &id, at(30), &backend)
.expect_err("in-progress job must refuse");
assert!(matches!(err, DecayError::Validation(_)));
}
#[test]
fn runner_llm_job_fails_closed_without_attestation_in_dequeue_path() {
let pool = seed_pool();
let m1 = insert_test_memory(&pool, "alpha");
let id = enqueue_candidate_llm_job(&pool, &[m1]);
let backend = NoopSummaryBackend;
let err =
run_next_pending_job(&pool, at(60), &backend).expect_err("llm via dequeue must refuse");
assert!(
matches!(err, DecayError::LlmSummaryRequiresOperatorAttestation),
"got {err:?}"
);
let record = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
assert_eq!(record.state_wire, "failed");
assert_eq!(
record.state_reason.as_deref(),
Some(super::super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
);
}
#[test]
fn runner_skips_when_scheduled_for_in_future() {
let pool = seed_pool();
let m1 = insert_test_memory(&pool, "alpha");
let id = DecayJobId::new();
let job = DecayJob {
id,
kind: DecayJobKind::CandidateCompression {
source_memory_ids: vec![m1],
summary_method: SummaryMethod::DeterministicConcatenate,
},
state: DecayJobState::Pending,
scheduled_for: at(120),
created_at: at(0),
created_by: "operator:test".into(),
updated_at: at(0),
};
let record: DecayJobRecord = job.into();
DecayJobRepo::new(&pool).insert(&record).unwrap();
let backend = NoopSummaryBackend;
let ran = run_next_pending_job(&pool, at(60), &backend).expect("idle until window opens");
assert_eq!(ran, None);
let ran = run_next_pending_job(&pool, at(180), &backend).expect("dispatch ok");
assert_eq!(ran, Some(id));
}
}