Skip to main content

cortex_memory/decay/
runner.rs

1//! Phase 4.D decay job runner.
2//!
3//! ## Contract
4//!
5//! - **Atomic**: the `Pending -> InProgress` claim and the `InProgress ->
6//!   Completed | Failed` settle happen via [`DecayJobRepo::update_state`]
7//!   calls. The substrate refuses no-op updates (no matching row → hard
8//!   error), so a stale claim surfaces rather than silently dropping.
9//!   The substrate does NOT today CAS on prior state; the runner
10//!   protects against double-claim by only proceeding when it has just
11//!   observed `Pending` (loaded then immediately transitioned).
12//! - **Idempotent**: re-running a terminal (`Completed`, `Failed`,
13//!   `Cancelled`) job is a no-op ([`run_specific_job`] returns Ok
14//!   without re-dispatching). The Pending surface
15//!   ([`run_next_pending_job`]) returns `Ok(None)` when the queue is
16//!   empty.
17//! - **Fail-closed**: any error inside the dispatched method transitions
18//!   the job to `Failed` with the error's stable invariant (when one
19//!   exists) or its `Display` text otherwise. The runner never leaves a
20//!   job stuck in `InProgress` on a normal error path.
21//! - **Observable**: every stage emits a diagnostic line on stderr under
22//!   a stable prefix (`cortex_memory::decay::runner`) so operators can
23//!   tail logs without parsing JSON envelopes.
24//!
25//! ## Why "now" is a parameter
26//!
27//! The runner takes an explicit `now: DateTime<Utc>` for the scheduling
28//! window. The CLI and dispatcher both pass `Utc::now()` in production;
29//! tests pass a frozen timestamp so the pending-pickup query is
30//! deterministic.
31
32use std::path::Path;
33
34use chrono::{DateTime, Utc};
35use cortex_core::{DecayJobId, EpisodeId, MemoryId};
36use cortex_llm::SummaryBackend;
37use cortex_store::repo::{DecayJobRecord, DecayJobRepo};
38use cortex_store::Pool;
39use serde_json::Value;
40
41use super::{
42    compress, summary, DecayError, DecayJob, DecayJobKind, DecayJobState, DecayResult,
43    SummaryMethod,
44};
45
46/// Run the next pending decay job whose `scheduled_for <= now`, if any.
47///
48/// Returns the id of the job that ran (whether it transitioned to
49/// `Completed` or `Failed`), or `Ok(None)` when the queue is empty for
50/// the supplied window.
51///
52/// `summary_backend` is consulted only for LLM-summary jobs. Pass
53/// [`cortex_llm::NoopSummaryBackend`] (the fail-closed default) to keep
54/// production paths from running LLM jobs unattended; LLM jobs without
55/// `operator_attestation` always refuse via the runner's typed surface
56/// regardless of which backend is wired.
57pub fn run_next_pending_job(
58    pool: &Pool,
59    now: DateTime<Utc>,
60    summary_backend: &dyn SummaryBackend,
61) -> DecayResult<Option<DecayJobId>> {
62    run_next_pending_job_with_attestation(pool, now, None, summary_backend)
63}
64
65/// Variant of [`run_next_pending_job`] that allows the caller to supply
66/// an operator-attestation path for the next-pending LLM-summary job.
67/// The dequeue path itself does not select on summary method, so the
68/// attestation is only consumed if the next picked job turns out to be
69/// an LLM-summary job; deterministic jobs ignore it.
70pub fn run_next_pending_job_with_attestation(
71    pool: &Pool,
72    now: DateTime<Utc>,
73    operator_attestation: Option<&Path>,
74    summary_backend: &dyn SummaryBackend,
75) -> DecayResult<Option<DecayJobId>> {
76    let repo = DecayJobRepo::new(pool);
77    let pending = repo.list_pending_ready(now)?;
78    let Some(record) = pending.into_iter().next() else {
79        log_stage("idle", None, "no pending decay jobs");
80        return Ok(None);
81    };
82    let id = record.id;
83    run_loaded_record(pool, record, now, operator_attestation, summary_backend)?;
84    Ok(Some(id))
85}
86
87/// Run a specific decay job by id.
88///
89/// Idempotency: a terminal job (`Completed`, `Failed`, `Cancelled`) is a
90/// no-op. A `Pending` job is dispatched. An `InProgress` job is treated
91/// as a recovery situation — the runner refuses to re-claim it (another
92/// runner may still be live) and surfaces a validation error rather than
93/// silently re-running.
94///
95/// `summary_backend` is consulted only for LLM-summary jobs. See
96/// [`run_next_pending_job`] for the backend posture.
97pub fn run_specific_job(
98    pool: &Pool,
99    id: &DecayJobId,
100    now: DateTime<Utc>,
101    summary_backend: &dyn SummaryBackend,
102) -> DecayResult<()> {
103    run_specific_job_with_attestation(pool, id, now, None, summary_backend)
104}
105
106/// Variant of [`run_specific_job`] that allows the caller to supply an
107/// operator-attestation path for the dispatched job. Required when the
108/// resolved job is an LLM-summary job; deterministic jobs ignore it.
109pub fn run_specific_job_with_attestation(
110    pool: &Pool,
111    id: &DecayJobId,
112    now: DateTime<Utc>,
113    operator_attestation: Option<&Path>,
114    summary_backend: &dyn SummaryBackend,
115) -> DecayResult<()> {
116    let repo = DecayJobRepo::new(pool);
117    let record = repo
118        .read(id)?
119        .ok_or_else(|| DecayError::Validation(format!("decay job {id} not found")))?;
120    let job: DecayJob =
121        record
122            .clone()
123            .try_into()
124            .map_err(|err: super::DecayJobConversionError| {
125                DecayError::Validation(format!("decay job {id} row malformed: {err}"))
126            })?;
127    match job.state {
128        DecayJobState::Completed { .. }
129        | DecayJobState::Failed { .. }
130        | DecayJobState::Cancelled => {
131            log_stage(
132                "skip_terminal",
133                Some(id),
134                &format!("job is already {}; no-op", job.state.state_wire()),
135            );
136            Ok(())
137        }
138        DecayJobState::InProgress => Err(DecayError::Validation(format!(
139            "decay job {id} is already in_progress; refusing to re-claim from another runner",
140        ))),
141        DecayJobState::Pending => {
142            run_loaded_record(pool, record, now, operator_attestation, summary_backend)
143        }
144    }
145}
146
147fn run_loaded_record(
148    pool: &Pool,
149    record: DecayJobRecord,
150    now: DateTime<Utc>,
151    operator_attestation: Option<&Path>,
152    summary_backend: &dyn SummaryBackend,
153) -> DecayResult<()> {
154    let job: DecayJob =
155        record
156            .clone()
157            .try_into()
158            .map_err(|err: super::DecayJobConversionError| {
159                DecayError::Validation(format!("decay job {} row malformed: {err}", record.id))
160            })?;
161    let repo = DecayJobRepo::new(pool);
162
163    // Claim: Pending -> InProgress. update_state with state="in_progress"
164    // and no payload. The substrate refuses if no row matches, which
165    // surfaces a double-claim drift as a hard error.
166    log_stage(
167        "claim",
168        Some(&job.id),
169        "transitioning pending -> in_progress",
170    );
171    repo.update_state(&job.id, "in_progress", None, None, now)?;
172
173    // Dispatch on the typed kind.
174    let dispatch_result = match &job.kind {
175        DecayJobKind::CandidateCompression {
176            source_memory_ids,
177            summary_method,
178        } => dispatch_candidate_compression(
179            pool,
180            &record,
181            &job.id,
182            source_memory_ids,
183            summary_method,
184            &job.created_by,
185            operator_attestation,
186            summary_backend,
187        ),
188        DecayJobKind::EpisodeCompression {
189            source_episode_ids,
190            summary_method,
191        } => dispatch_episode_compression(
192            pool,
193            &record,
194            &job.id,
195            source_episode_ids,
196            summary_method,
197            &job.created_by,
198            operator_attestation,
199            summary_backend,
200        ),
201        DecayJobKind::ExpiredPrincipleReview { .. } => {
202            // The expired-principle-review kind opens a ceremony rather
203            // than landing a memory; the substrate landed in D3-A
204            // intentionally has no scheduler-side compression to do
205            // here. The runner records the job as completed-without-
206            // memory so the operator-facing surface can pick up the
207            // ceremony separately. This keeps the durable state machine
208            // honest: a successful invocation transitions to Completed,
209            // not stuck in InProgress.
210            log_stage(
211                "dispatch",
212                Some(&job.id),
213                "kind=expired_principle_review (no in-process work; ceremony opened separately)",
214            );
215            Ok(DispatchOutcome::CompletedWithoutMemory)
216        }
217    };
218
219    // Settle: InProgress -> Completed | Failed.
220    let settle_now = Utc::now();
221    match dispatch_result {
222        Ok(DispatchOutcome::CompletedWithMemory(memory_id)) => {
223            log_stage(
224                "settle",
225                Some(&job.id),
226                &format!("compression ok; produced {memory_id}"),
227            );
228            repo.update_state(&job.id, "completed", None, Some(&memory_id), settle_now)?;
229            log_stage("complete", Some(&job.id), "transitioned to completed");
230            Ok(())
231        }
232        Ok(DispatchOutcome::CompletedWithoutMemory) => {
233            log_stage(
234                "settle",
235                Some(&job.id),
236                "ceremony opened; no memory produced",
237            );
238            repo.update_state(&job.id, "completed", None, None, settle_now)?;
239            log_stage("complete", Some(&job.id), "transitioned to completed");
240            Ok(())
241        }
242        Err(err) => {
243            let reason = invariant_or_display(&err);
244            log_stage(
245                "settle",
246                Some(&job.id),
247                &format!("compression failed: {reason}"),
248            );
249            // Refuse to write an empty state_reason (the substrate would
250            // reject it anyway); fall back to a generic marker.
251            let stable_reason = if reason.trim().is_empty() {
252                "decay.runner.failure".to_string()
253            } else {
254                reason
255            };
256            repo.update_state(&job.id, "failed", Some(&stable_reason), None, settle_now)?;
257            log_stage("complete", Some(&job.id), "transitioned to failed");
258            Err(err)
259        }
260    }
261}
262
263enum DispatchOutcome {
264    CompletedWithMemory(MemoryId),
265    CompletedWithoutMemory,
266}
267
268#[allow(clippy::too_many_arguments)]
269fn dispatch_candidate_compression(
270    pool: &Pool,
271    record: &DecayJobRecord,
272    job_id: &DecayJobId,
273    source_memory_ids: &[MemoryId],
274    summary_method: &SummaryMethod,
275    operator: &str,
276    operator_attestation: Option<&Path>,
277    summary_backend: &dyn SummaryBackend,
278) -> DecayResult<DispatchOutcome> {
279    match summary_method {
280        SummaryMethod::DeterministicConcatenate => {
281            log_stage(
282                "dispatch",
283                Some(job_id),
284                "kind=candidate_compression method=deterministic_concatenate",
285            );
286            let produced = compress::compress_candidate_memories_with_job(
287                pool,
288                source_memory_ids,
289                operator,
290                Some(job_id),
291            )?;
292            Ok(DispatchOutcome::CompletedWithMemory(produced))
293        }
294        SummaryMethod::LlmSummary { .. } => {
295            log_stage(
296                "dispatch",
297                Some(job_id),
298                "kind=candidate_compression method=llm_summary (operator-fired)",
299            );
300            let produced =
301                summary::run_llm_summary_job(pool, record, operator_attestation, summary_backend)?;
302            Ok(DispatchOutcome::CompletedWithMemory(produced))
303        }
304    }
305}
306
307#[allow(clippy::too_many_arguments)]
308fn dispatch_episode_compression(
309    pool: &Pool,
310    record: &DecayJobRecord,
311    job_id: &DecayJobId,
312    source_episode_ids: &[EpisodeId],
313    summary_method: &SummaryMethod,
314    operator: &str,
315    operator_attestation: Option<&Path>,
316    summary_backend: &dyn SummaryBackend,
317) -> DecayResult<DispatchOutcome> {
318    match summary_method {
319        SummaryMethod::DeterministicConcatenate => {
320            log_stage(
321                "dispatch",
322                Some(job_id),
323                "kind=episode_compression method=deterministic_concatenate",
324            );
325            let produced = compress::compress_episodes_with_job(
326                pool,
327                source_episode_ids,
328                operator,
329                Some(job_id),
330            )?;
331            Ok(DispatchOutcome::CompletedWithMemory(produced))
332        }
333        SummaryMethod::LlmSummary { .. } => {
334            log_stage(
335                "dispatch",
336                Some(job_id),
337                "kind=episode_compression method=llm_summary (operator-fired)",
338            );
339            let produced =
340                summary::run_llm_summary_job(pool, record, operator_attestation, summary_backend)?;
341            Ok(DispatchOutcome::CompletedWithMemory(produced))
342        }
343    }
344}
345
346fn invariant_or_display(err: &DecayError) -> String {
347    err.invariant()
348        .map(str::to_string)
349        .unwrap_or_else(|| err.to_string())
350}
351
352fn log_stage(stage: &str, job_id: Option<&DecayJobId>, message: &str) {
353    match job_id {
354        Some(id) => eprintln!("cortex_memory::decay::runner stage={stage} job={id} {message}"),
355        None => eprintln!("cortex_memory::decay::runner stage={stage} {message}"),
356    }
357}
358
359/// Decay job kind discriminator (mirrors `super::DecayJobKind::kind_wire`).
360/// Kept private to the runner module. Re-exported types are unchanged.
361#[allow(dead_code)]
362fn kind_wire_of(value: &Value) -> Option<&str> {
363    value.as_str()
364}
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use chrono::TimeZone;
370    use cortex_core::MemoryId;
371    use cortex_llm::NoopSummaryBackend;
372    use cortex_store::migrate::apply_pending;
373    use cortex_store::repo::{MemoryCandidate, MemoryRepo};
374    use rusqlite::Connection;
375    use serde_json::Value;
376
377    fn seed_pool() -> Pool {
378        let pool = Connection::open_in_memory().expect("open in-memory pool");
379        apply_pending(&pool).expect("apply migrations");
380        pool
381    }
382
383    fn at(offset_seconds: i64) -> DateTime<Utc> {
384        Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
385            + chrono::Duration::seconds(offset_seconds)
386    }
387
388    fn insert_test_memory(pool: &Pool, claim: &str) -> MemoryId {
389        let id = MemoryId::new();
390        let candidate = MemoryCandidate {
391            id,
392            memory_type: "semantic".into(),
393            claim: claim.into(),
394            source_episodes_json: Value::Array(Vec::new()),
395            source_events_json: Value::Array(vec![Value::String(
396                "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV".into(),
397            )]),
398            domains_json: Value::Array(vec![Value::String("t".into())]),
399            salience_json: Value::Object(serde_json::Map::new()),
400            confidence: 0.7,
401            authority: "candidate".into(),
402            applies_when_json: Value::Object(serde_json::Map::new()),
403            does_not_apply_when_json: Value::Array(Vec::new()),
404            created_at: Utc::now(),
405            updated_at: Utc::now(),
406        };
407        MemoryRepo::new(pool).insert_candidate(&candidate).unwrap();
408        id
409    }
410
411    fn enqueue_candidate_det_job(pool: &Pool, sources: &[MemoryId]) -> DecayJobId {
412        let id = DecayJobId::new();
413        let job = DecayJob {
414            id,
415            kind: DecayJobKind::CandidateCompression {
416                source_memory_ids: sources.to_vec(),
417                summary_method: SummaryMethod::DeterministicConcatenate,
418            },
419            state: DecayJobState::Pending,
420            scheduled_for: at(0),
421            created_at: at(0),
422            created_by: "operator:test".into(),
423            updated_at: at(0),
424        };
425        let record: DecayJobRecord = job.into();
426        DecayJobRepo::new(pool).insert(&record).unwrap();
427        id
428    }
429
430    fn enqueue_candidate_llm_job(pool: &Pool, sources: &[MemoryId]) -> DecayJobId {
431        let id = DecayJobId::new();
432        let job = DecayJob {
433            id,
434            kind: DecayJobKind::CandidateCompression {
435                source_memory_ids: sources.to_vec(),
436                summary_method: SummaryMethod::LlmSummary {
437                    operator_attestation_required: true,
438                },
439            },
440            state: DecayJobState::Pending,
441            scheduled_for: at(0),
442            created_at: at(0),
443            created_by: "operator:test".into(),
444            updated_at: at(0),
445        };
446        let record: DecayJobRecord = job.into();
447        DecayJobRepo::new(pool).insert(&record).unwrap();
448        id
449    }
450
451    #[test]
452    fn runner_transitions_pending_to_completed_atomically() {
453        let pool = seed_pool();
454        let m1 = insert_test_memory(&pool, "alpha");
455        let m2 = insert_test_memory(&pool, "beta");
456        let id = enqueue_candidate_det_job(&pool, &[m1, m2]);
457
458        let backend = NoopSummaryBackend;
459        let ran = run_next_pending_job(&pool, at(60), &backend).expect("run ok");
460        assert_eq!(ran, Some(id));
461
462        let repo = DecayJobRepo::new(&pool);
463        let record = repo.read(&id).unwrap().unwrap();
464        // After dispatch, the deterministic job should be Completed and
465        // carry a produced summary memory id. State machine path is
466        // Pending -> InProgress -> Completed, all wrapped in repo
467        // update_state calls.
468        assert_eq!(record.state_wire, "completed");
469        assert!(record.result_memory_id.is_some());
470    }
471
472    #[test]
473    fn runner_marks_failed_on_inner_error_with_invariant_reason() {
474        let pool = seed_pool();
475        // No memories inserted: the source ids will not resolve. The
476        // dispatcher returns DecayError::Validation and the runner
477        // transitions the job to Failed.
478        let phantom = MemoryId::new();
479        let id = enqueue_candidate_det_job(&pool, &[phantom]);
480
481        let backend = NoopSummaryBackend;
482        let err =
483            run_next_pending_job(&pool, at(60), &backend).expect_err("inner error must propagate");
484        match err {
485            DecayError::Validation(msg) => {
486                assert!(
487                    msg.contains(super::super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT),
488                    "msg: {msg}"
489                );
490            }
491            other => panic!("expected Validation, got {other:?}"),
492        }
493        let record = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
494        assert_eq!(record.state_wire, "failed");
495        let reason = record.state_reason.expect("failure reason persisted");
496        assert!(!reason.trim().is_empty());
497    }
498
499    #[test]
500    fn runner_completed_job_is_idempotent_on_re_run() {
501        let pool = seed_pool();
502        let m1 = insert_test_memory(&pool, "alpha");
503        let m2 = insert_test_memory(&pool, "beta");
504        let id = enqueue_candidate_det_job(&pool, &[m1, m2]);
505
506        let backend = NoopSummaryBackend;
507        // First run: Pending -> Completed.
508        run_next_pending_job(&pool, at(60), &backend).expect("first run ok");
509        let before = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
510        assert_eq!(before.state_wire, "completed");
511
512        // Re-running by id is a no-op (idempotent on terminal state).
513        run_specific_job(&pool, &id, at(70), &backend).expect("re-run is a no-op");
514        let after = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
515        assert_eq!(after, before, "completed job must be untouched on re-run");
516    }
517
518    #[test]
519    fn runner_idle_when_queue_empty() {
520        let pool = seed_pool();
521        let backend = NoopSummaryBackend;
522        let ran = run_next_pending_job(&pool, at(60), &backend).expect("idle ok");
523        assert_eq!(ran, None);
524    }
525
526    #[test]
527    fn runner_refuses_in_progress_re_claim() {
528        let pool = seed_pool();
529        let m1 = insert_test_memory(&pool, "alpha");
530        let id = enqueue_candidate_det_job(&pool, &[m1]);
531        // Force the job into InProgress without dispatching.
532        DecayJobRepo::new(&pool)
533            .update_state(&id, "in_progress", None, None, at(20))
534            .unwrap();
535        let backend = NoopSummaryBackend;
536        let err = run_specific_job(&pool, &id, at(30), &backend)
537            .expect_err("in-progress job must refuse");
538        assert!(matches!(err, DecayError::Validation(_)));
539    }
540
541    #[test]
542    fn runner_llm_job_fails_closed_without_attestation_in_dequeue_path() {
543        let pool = seed_pool();
544        let m1 = insert_test_memory(&pool, "alpha");
545        let id = enqueue_candidate_llm_job(&pool, &[m1]);
546        let backend = NoopSummaryBackend;
547        let err =
548            run_next_pending_job(&pool, at(60), &backend).expect_err("llm via dequeue must refuse");
549        assert!(
550            matches!(err, DecayError::LlmSummaryRequiresOperatorAttestation),
551            "got {err:?}"
552        );
553        let record = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
554        assert_eq!(record.state_wire, "failed");
555        assert_eq!(
556            record.state_reason.as_deref(),
557            Some(super::super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
558        );
559    }
560
561    #[test]
562    fn runner_skips_when_scheduled_for_in_future() {
563        let pool = seed_pool();
564        let m1 = insert_test_memory(&pool, "alpha");
565
566        // Enqueue with scheduled_for at t=120; run at t=60.
567        let id = DecayJobId::new();
568        let job = DecayJob {
569            id,
570            kind: DecayJobKind::CandidateCompression {
571                source_memory_ids: vec![m1],
572                summary_method: SummaryMethod::DeterministicConcatenate,
573            },
574            state: DecayJobState::Pending,
575            scheduled_for: at(120),
576            created_at: at(0),
577            created_by: "operator:test".into(),
578            updated_at: at(0),
579        };
580        let record: DecayJobRecord = job.into();
581        DecayJobRepo::new(&pool).insert(&record).unwrap();
582
583        let backend = NoopSummaryBackend;
584        let ran = run_next_pending_job(&pool, at(60), &backend).expect("idle until window opens");
585        assert_eq!(ran, None);
586
587        // After the window opens, the runner picks it up.
588        let ran = run_next_pending_job(&pool, at(180), &backend).expect("dispatch ok");
589        assert_eq!(ran, Some(id));
590    }
591}