1use std::path::Path;
33
34use chrono::{DateTime, Utc};
35use cortex_core::{DecayJobId, EpisodeId, MemoryId};
36use cortex_llm::SummaryBackend;
37use cortex_store::repo::{DecayJobRecord, DecayJobRepo};
38use cortex_store::Pool;
39use serde_json::Value;
40
41use super::{
42 compress, summary, DecayError, DecayJob, DecayJobKind, DecayJobState, DecayResult,
43 SummaryMethod,
44};
45
46pub fn run_next_pending_job(
58 pool: &Pool,
59 now: DateTime<Utc>,
60 summary_backend: &dyn SummaryBackend,
61) -> DecayResult<Option<DecayJobId>> {
62 run_next_pending_job_with_attestation(pool, now, None, summary_backend)
63}
64
65pub fn run_next_pending_job_with_attestation(
71 pool: &Pool,
72 now: DateTime<Utc>,
73 operator_attestation: Option<&Path>,
74 summary_backend: &dyn SummaryBackend,
75) -> DecayResult<Option<DecayJobId>> {
76 let repo = DecayJobRepo::new(pool);
77 let pending = repo.list_pending_ready(now)?;
78 let Some(record) = pending.into_iter().next() else {
79 log_stage("idle", None, "no pending decay jobs");
80 return Ok(None);
81 };
82 let id = record.id;
83 run_loaded_record(pool, record, now, operator_attestation, summary_backend)?;
84 Ok(Some(id))
85}
86
87pub fn run_specific_job(
98 pool: &Pool,
99 id: &DecayJobId,
100 now: DateTime<Utc>,
101 summary_backend: &dyn SummaryBackend,
102) -> DecayResult<()> {
103 run_specific_job_with_attestation(pool, id, now, None, summary_backend)
104}
105
106pub fn run_specific_job_with_attestation(
110 pool: &Pool,
111 id: &DecayJobId,
112 now: DateTime<Utc>,
113 operator_attestation: Option<&Path>,
114 summary_backend: &dyn SummaryBackend,
115) -> DecayResult<()> {
116 let repo = DecayJobRepo::new(pool);
117 let record = repo
118 .read(id)?
119 .ok_or_else(|| DecayError::Validation(format!("decay job {id} not found")))?;
120 let job: DecayJob =
121 record
122 .clone()
123 .try_into()
124 .map_err(|err: super::DecayJobConversionError| {
125 DecayError::Validation(format!("decay job {id} row malformed: {err}"))
126 })?;
127 match job.state {
128 DecayJobState::Completed { .. }
129 | DecayJobState::Failed { .. }
130 | DecayJobState::Cancelled => {
131 log_stage(
132 "skip_terminal",
133 Some(id),
134 &format!("job is already {}; no-op", job.state.state_wire()),
135 );
136 Ok(())
137 }
138 DecayJobState::InProgress => Err(DecayError::Validation(format!(
139 "decay job {id} is already in_progress; refusing to re-claim from another runner",
140 ))),
141 DecayJobState::Pending => {
142 run_loaded_record(pool, record, now, operator_attestation, summary_backend)
143 }
144 }
145}
146
147fn run_loaded_record(
148 pool: &Pool,
149 record: DecayJobRecord,
150 now: DateTime<Utc>,
151 operator_attestation: Option<&Path>,
152 summary_backend: &dyn SummaryBackend,
153) -> DecayResult<()> {
154 let job: DecayJob =
155 record
156 .clone()
157 .try_into()
158 .map_err(|err: super::DecayJobConversionError| {
159 DecayError::Validation(format!("decay job {} row malformed: {err}", record.id))
160 })?;
161 let repo = DecayJobRepo::new(pool);
162
163 log_stage(
167 "claim",
168 Some(&job.id),
169 "transitioning pending -> in_progress",
170 );
171 repo.update_state(&job.id, "in_progress", None, None, now)?;
172
173 let dispatch_result = match &job.kind {
175 DecayJobKind::CandidateCompression {
176 source_memory_ids,
177 summary_method,
178 } => dispatch_candidate_compression(
179 pool,
180 &record,
181 &job.id,
182 source_memory_ids,
183 summary_method,
184 &job.created_by,
185 operator_attestation,
186 summary_backend,
187 ),
188 DecayJobKind::EpisodeCompression {
189 source_episode_ids,
190 summary_method,
191 } => dispatch_episode_compression(
192 pool,
193 &record,
194 &job.id,
195 source_episode_ids,
196 summary_method,
197 &job.created_by,
198 operator_attestation,
199 summary_backend,
200 ),
201 DecayJobKind::ExpiredPrincipleReview { .. } => {
202 log_stage(
211 "dispatch",
212 Some(&job.id),
213 "kind=expired_principle_review (no in-process work; ceremony opened separately)",
214 );
215 Ok(DispatchOutcome::CompletedWithoutMemory)
216 }
217 };
218
219 let settle_now = Utc::now();
221 match dispatch_result {
222 Ok(DispatchOutcome::CompletedWithMemory(memory_id)) => {
223 log_stage(
224 "settle",
225 Some(&job.id),
226 &format!("compression ok; produced {memory_id}"),
227 );
228 repo.update_state(&job.id, "completed", None, Some(&memory_id), settle_now)?;
229 log_stage("complete", Some(&job.id), "transitioned to completed");
230 Ok(())
231 }
232 Ok(DispatchOutcome::CompletedWithoutMemory) => {
233 log_stage(
234 "settle",
235 Some(&job.id),
236 "ceremony opened; no memory produced",
237 );
238 repo.update_state(&job.id, "completed", None, None, settle_now)?;
239 log_stage("complete", Some(&job.id), "transitioned to completed");
240 Ok(())
241 }
242 Err(err) => {
243 let reason = invariant_or_display(&err);
244 log_stage(
245 "settle",
246 Some(&job.id),
247 &format!("compression failed: {reason}"),
248 );
249 let stable_reason = if reason.trim().is_empty() {
252 "decay.runner.failure".to_string()
253 } else {
254 reason
255 };
256 repo.update_state(&job.id, "failed", Some(&stable_reason), None, settle_now)?;
257 log_stage("complete", Some(&job.id), "transitioned to failed");
258 Err(err)
259 }
260 }
261}
262
263enum DispatchOutcome {
264 CompletedWithMemory(MemoryId),
265 CompletedWithoutMemory,
266}
267
268#[allow(clippy::too_many_arguments)]
269fn dispatch_candidate_compression(
270 pool: &Pool,
271 record: &DecayJobRecord,
272 job_id: &DecayJobId,
273 source_memory_ids: &[MemoryId],
274 summary_method: &SummaryMethod,
275 operator: &str,
276 operator_attestation: Option<&Path>,
277 summary_backend: &dyn SummaryBackend,
278) -> DecayResult<DispatchOutcome> {
279 match summary_method {
280 SummaryMethod::DeterministicConcatenate => {
281 log_stage(
282 "dispatch",
283 Some(job_id),
284 "kind=candidate_compression method=deterministic_concatenate",
285 );
286 let produced = compress::compress_candidate_memories_with_job(
287 pool,
288 source_memory_ids,
289 operator,
290 Some(job_id),
291 )?;
292 Ok(DispatchOutcome::CompletedWithMemory(produced))
293 }
294 SummaryMethod::LlmSummary { .. } => {
295 log_stage(
296 "dispatch",
297 Some(job_id),
298 "kind=candidate_compression method=llm_summary (operator-fired)",
299 );
300 let produced =
301 summary::run_llm_summary_job(pool, record, operator_attestation, summary_backend)?;
302 Ok(DispatchOutcome::CompletedWithMemory(produced))
303 }
304 }
305}
306
307#[allow(clippy::too_many_arguments)]
308fn dispatch_episode_compression(
309 pool: &Pool,
310 record: &DecayJobRecord,
311 job_id: &DecayJobId,
312 source_episode_ids: &[EpisodeId],
313 summary_method: &SummaryMethod,
314 operator: &str,
315 operator_attestation: Option<&Path>,
316 summary_backend: &dyn SummaryBackend,
317) -> DecayResult<DispatchOutcome> {
318 match summary_method {
319 SummaryMethod::DeterministicConcatenate => {
320 log_stage(
321 "dispatch",
322 Some(job_id),
323 "kind=episode_compression method=deterministic_concatenate",
324 );
325 let produced = compress::compress_episodes_with_job(
326 pool,
327 source_episode_ids,
328 operator,
329 Some(job_id),
330 )?;
331 Ok(DispatchOutcome::CompletedWithMemory(produced))
332 }
333 SummaryMethod::LlmSummary { .. } => {
334 log_stage(
335 "dispatch",
336 Some(job_id),
337 "kind=episode_compression method=llm_summary (operator-fired)",
338 );
339 let produced =
340 summary::run_llm_summary_job(pool, record, operator_attestation, summary_backend)?;
341 Ok(DispatchOutcome::CompletedWithMemory(produced))
342 }
343 }
344}
345
346fn invariant_or_display(err: &DecayError) -> String {
347 err.invariant()
348 .map(str::to_string)
349 .unwrap_or_else(|| err.to_string())
350}
351
352fn log_stage(stage: &str, job_id: Option<&DecayJobId>, message: &str) {
353 match job_id {
354 Some(id) => eprintln!("cortex_memory::decay::runner stage={stage} job={id} {message}"),
355 None => eprintln!("cortex_memory::decay::runner stage={stage} {message}"),
356 }
357}
358
359#[allow(dead_code)]
362fn kind_wire_of(value: &Value) -> Option<&str> {
363 value.as_str()
364}
365
366#[cfg(test)]
367mod tests {
368 use super::*;
369 use chrono::TimeZone;
370 use cortex_core::MemoryId;
371 use cortex_llm::NoopSummaryBackend;
372 use cortex_store::migrate::apply_pending;
373 use cortex_store::repo::{MemoryCandidate, MemoryRepo};
374 use rusqlite::Connection;
375 use serde_json::Value;
376
377 fn seed_pool() -> Pool {
378 let pool = Connection::open_in_memory().expect("open in-memory pool");
379 apply_pending(&pool).expect("apply migrations");
380 pool
381 }
382
383 fn at(offset_seconds: i64) -> DateTime<Utc> {
384 Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
385 + chrono::Duration::seconds(offset_seconds)
386 }
387
388 fn insert_test_memory(pool: &Pool, claim: &str) -> MemoryId {
389 let id = MemoryId::new();
390 let candidate = MemoryCandidate {
391 id,
392 memory_type: "semantic".into(),
393 claim: claim.into(),
394 source_episodes_json: Value::Array(Vec::new()),
395 source_events_json: Value::Array(vec![Value::String(
396 "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV".into(),
397 )]),
398 domains_json: Value::Array(vec![Value::String("t".into())]),
399 salience_json: Value::Object(serde_json::Map::new()),
400 confidence: 0.7,
401 authority: "candidate".into(),
402 applies_when_json: Value::Object(serde_json::Map::new()),
403 does_not_apply_when_json: Value::Array(Vec::new()),
404 created_at: Utc::now(),
405 updated_at: Utc::now(),
406 };
407 MemoryRepo::new(pool).insert_candidate(&candidate).unwrap();
408 id
409 }
410
411 fn enqueue_candidate_det_job(pool: &Pool, sources: &[MemoryId]) -> DecayJobId {
412 let id = DecayJobId::new();
413 let job = DecayJob {
414 id,
415 kind: DecayJobKind::CandidateCompression {
416 source_memory_ids: sources.to_vec(),
417 summary_method: SummaryMethod::DeterministicConcatenate,
418 },
419 state: DecayJobState::Pending,
420 scheduled_for: at(0),
421 created_at: at(0),
422 created_by: "operator:test".into(),
423 updated_at: at(0),
424 };
425 let record: DecayJobRecord = job.into();
426 DecayJobRepo::new(pool).insert(&record).unwrap();
427 id
428 }
429
430 fn enqueue_candidate_llm_job(pool: &Pool, sources: &[MemoryId]) -> DecayJobId {
431 let id = DecayJobId::new();
432 let job = DecayJob {
433 id,
434 kind: DecayJobKind::CandidateCompression {
435 source_memory_ids: sources.to_vec(),
436 summary_method: SummaryMethod::LlmSummary {
437 operator_attestation_required: true,
438 },
439 },
440 state: DecayJobState::Pending,
441 scheduled_for: at(0),
442 created_at: at(0),
443 created_by: "operator:test".into(),
444 updated_at: at(0),
445 };
446 let record: DecayJobRecord = job.into();
447 DecayJobRepo::new(pool).insert(&record).unwrap();
448 id
449 }
450
451 #[test]
452 fn runner_transitions_pending_to_completed_atomically() {
453 let pool = seed_pool();
454 let m1 = insert_test_memory(&pool, "alpha");
455 let m2 = insert_test_memory(&pool, "beta");
456 let id = enqueue_candidate_det_job(&pool, &[m1, m2]);
457
458 let backend = NoopSummaryBackend;
459 let ran = run_next_pending_job(&pool, at(60), &backend).expect("run ok");
460 assert_eq!(ran, Some(id));
461
462 let repo = DecayJobRepo::new(&pool);
463 let record = repo.read(&id).unwrap().unwrap();
464 assert_eq!(record.state_wire, "completed");
469 assert!(record.result_memory_id.is_some());
470 }
471
472 #[test]
473 fn runner_marks_failed_on_inner_error_with_invariant_reason() {
474 let pool = seed_pool();
475 let phantom = MemoryId::new();
479 let id = enqueue_candidate_det_job(&pool, &[phantom]);
480
481 let backend = NoopSummaryBackend;
482 let err =
483 run_next_pending_job(&pool, at(60), &backend).expect_err("inner error must propagate");
484 match err {
485 DecayError::Validation(msg) => {
486 assert!(
487 msg.contains(super::super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT),
488 "msg: {msg}"
489 );
490 }
491 other => panic!("expected Validation, got {other:?}"),
492 }
493 let record = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
494 assert_eq!(record.state_wire, "failed");
495 let reason = record.state_reason.expect("failure reason persisted");
496 assert!(!reason.trim().is_empty());
497 }
498
499 #[test]
500 fn runner_completed_job_is_idempotent_on_re_run() {
501 let pool = seed_pool();
502 let m1 = insert_test_memory(&pool, "alpha");
503 let m2 = insert_test_memory(&pool, "beta");
504 let id = enqueue_candidate_det_job(&pool, &[m1, m2]);
505
506 let backend = NoopSummaryBackend;
507 run_next_pending_job(&pool, at(60), &backend).expect("first run ok");
509 let before = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
510 assert_eq!(before.state_wire, "completed");
511
512 run_specific_job(&pool, &id, at(70), &backend).expect("re-run is a no-op");
514 let after = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
515 assert_eq!(after, before, "completed job must be untouched on re-run");
516 }
517
518 #[test]
519 fn runner_idle_when_queue_empty() {
520 let pool = seed_pool();
521 let backend = NoopSummaryBackend;
522 let ran = run_next_pending_job(&pool, at(60), &backend).expect("idle ok");
523 assert_eq!(ran, None);
524 }
525
526 #[test]
527 fn runner_refuses_in_progress_re_claim() {
528 let pool = seed_pool();
529 let m1 = insert_test_memory(&pool, "alpha");
530 let id = enqueue_candidate_det_job(&pool, &[m1]);
531 DecayJobRepo::new(&pool)
533 .update_state(&id, "in_progress", None, None, at(20))
534 .unwrap();
535 let backend = NoopSummaryBackend;
536 let err = run_specific_job(&pool, &id, at(30), &backend)
537 .expect_err("in-progress job must refuse");
538 assert!(matches!(err, DecayError::Validation(_)));
539 }
540
541 #[test]
542 fn runner_llm_job_fails_closed_without_attestation_in_dequeue_path() {
543 let pool = seed_pool();
544 let m1 = insert_test_memory(&pool, "alpha");
545 let id = enqueue_candidate_llm_job(&pool, &[m1]);
546 let backend = NoopSummaryBackend;
547 let err =
548 run_next_pending_job(&pool, at(60), &backend).expect_err("llm via dequeue must refuse");
549 assert!(
550 matches!(err, DecayError::LlmSummaryRequiresOperatorAttestation),
551 "got {err:?}"
552 );
553 let record = DecayJobRepo::new(&pool).read(&id).unwrap().unwrap();
554 assert_eq!(record.state_wire, "failed");
555 assert_eq!(
556 record.state_reason.as_deref(),
557 Some(super::super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
558 );
559 }
560
561 #[test]
562 fn runner_skips_when_scheduled_for_in_future() {
563 let pool = seed_pool();
564 let m1 = insert_test_memory(&pool, "alpha");
565
566 let id = DecayJobId::new();
568 let job = DecayJob {
569 id,
570 kind: DecayJobKind::CandidateCompression {
571 source_memory_ids: vec![m1],
572 summary_method: SummaryMethod::DeterministicConcatenate,
573 },
574 state: DecayJobState::Pending,
575 scheduled_for: at(120),
576 created_at: at(0),
577 created_by: "operator:test".into(),
578 updated_at: at(0),
579 };
580 let record: DecayJobRecord = job.into();
581 DecayJobRepo::new(&pool).insert(&record).unwrap();
582
583 let backend = NoopSummaryBackend;
584 let ran = run_next_pending_job(&pool, at(60), &backend).expect("idle until window opens");
585 assert_eq!(ran, None);
586
587 let ran = run_next_pending_job(&pool, at(180), &backend).expect("dispatch ok");
589 assert_eq!(ran, Some(id));
590 }
591}