1pub mod compress;
31pub mod runner;
32pub mod summary;
33
34use std::error::Error;
35use std::fmt;
36
37use chrono::{DateTime, Utc};
38use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
39use cortex_store::repo::DecayJobRecord;
40use cortex_store::StoreError;
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43
44pub type DecayResult<T> = Result<T, DecayError>;
46
47#[derive(Debug)]
49pub enum DecayError {
50 Store(StoreError),
52 Validation(String),
55 LlmSummaryRequiresOperatorAttestation,
57 LlmSummaryAttestationRejected(String),
59 LlmSummaryBackendCallFailed(String),
65}
66
67pub const DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT: &str =
69 "decay.llm_summary.requires_operator_attestation";
70
71pub const DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT: &str =
73 "decay.llm_summary.attestation_rejected";
74
75pub const DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT: &str =
79 "decay.llm_summary.backend_call_failed";
80
81pub const DECAY_COMPRESS_INPUT_INVALID_INVARIANT: &str = "decay.compress.input_invalid";
84
85pub const DECAY_COMPRESS_SOURCE_MISSING_INVARIANT: &str = "decay.compress.source_missing";
88
89impl DecayError {
90 #[must_use]
92 pub fn invariant(&self) -> Option<&'static str> {
93 match self {
94 Self::LlmSummaryRequiresOperatorAttestation => {
95 Some(DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
96 }
97 Self::LlmSummaryAttestationRejected(_) => {
98 Some(DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
99 }
100 Self::LlmSummaryBackendCallFailed(_) => {
101 Some(DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT)
102 }
103 _ => None,
104 }
105 }
106}
107
108impl fmt::Display for DecayError {
109 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110 match self {
111 Self::Store(err) => write!(f, "store error: {err}"),
112 Self::Validation(message) => write!(f, "validation failed: {message}"),
113 Self::LlmSummaryRequiresOperatorAttestation => write!(
114 f,
115 "invariant={DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT} LLM summary refused: operator attestation is required (operator-fired only)"
116 ),
117 Self::LlmSummaryAttestationRejected(detail) => write!(
118 f,
119 "invariant={DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT} LLM summary attestation rejected: {detail}"
120 ),
121 Self::LlmSummaryBackendCallFailed(detail) => write!(
122 f,
123 "invariant={DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT} LLM summary backend call failed: {detail}"
124 ),
125 }
126 }
127}
128
129impl Error for DecayError {
130 fn source(&self) -> Option<&(dyn Error + 'static)> {
131 match self {
132 Self::Store(err) => Some(err),
133 _ => None,
134 }
135 }
136}
137
138impl From<StoreError> for DecayError {
139 fn from(err: StoreError) -> Self {
140 Self::Store(err)
141 }
142}
143
144pub const DECAY_SUMMARY_MAX_CLAIM_BYTES: usize = 4096;
149
150pub const DECAY_SUMMARY_CLAIM_SEPARATOR: &str = " | ";
153
154pub const DECAY_SUMMARY_TRUNCATION_SUFFIX: &str = "... [truncated]";
157
158pub const DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION: u16 = 1;
163
164pub const DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE: &str = "cortex.decay.llm_summary";
168
169#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
175#[serde(tag = "kind", rename_all = "snake_case")]
176pub enum DecayJobKind {
177 EpisodeCompression {
180 source_episode_ids: Vec<EpisodeId>,
182 summary_method: SummaryMethod,
184 },
185 CandidateCompression {
187 source_memory_ids: Vec<MemoryId>,
189 summary_method: SummaryMethod,
191 },
192 ExpiredPrincipleReview {
197 principle_id: PrincipleId,
199 },
200}
201
202impl DecayJobKind {
203 #[must_use]
205 pub const fn kind_wire(&self) -> &'static str {
206 match self {
207 Self::EpisodeCompression { .. } => "episode_compression",
208 Self::CandidateCompression { .. } => "candidate_compression",
209 Self::ExpiredPrincipleReview { .. } => "expired_principle_review",
210 }
211 }
212
213 #[must_use]
216 pub fn summary_method(&self) -> Option<&SummaryMethod> {
217 match self {
218 Self::EpisodeCompression { summary_method, .. }
219 | Self::CandidateCompression { summary_method, .. } => Some(summary_method),
220 Self::ExpiredPrincipleReview { .. } => None,
221 }
222 }
223}
224
225#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
232#[serde(tag = "method", rename_all = "snake_case")]
233pub enum SummaryMethod {
234 DeterministicConcatenate,
238 LlmSummary {
242 operator_attestation_required: bool,
248 },
249}
250
251impl SummaryMethod {
252 #[must_use]
258 pub const fn method_wire(&self) -> &'static str {
259 match self {
260 Self::DeterministicConcatenate => "deterministic_concatenate",
261 Self::LlmSummary { .. } => "llm_summary",
262 }
263 }
264}
265
266pub use cortex_store::repo::SUMMARY_METHOD_NONE_WIRE;
273
274#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(tag = "state", rename_all = "snake_case")]
281pub enum DecayJobState {
282 Pending,
284 InProgress,
286 Completed {
290 result_memory_id: Option<MemoryId>,
292 },
293 Failed {
296 reason: String,
298 },
299 Cancelled,
301}
302
303impl DecayJobState {
304 #[must_use]
306 pub const fn state_wire(&self) -> &'static str {
307 match self {
308 Self::Pending => "pending",
309 Self::InProgress => "in_progress",
310 Self::Completed { .. } => "completed",
311 Self::Failed { .. } => "failed",
312 Self::Cancelled => "cancelled",
313 }
314 }
315
316 #[must_use]
318 pub const fn is_terminal(&self) -> bool {
319 matches!(
320 self,
321 Self::Completed { .. } | Self::Failed { .. } | Self::Cancelled
322 )
323 }
324
325 #[must_use]
327 pub const fn is_scheduling_eligible(&self) -> bool {
328 matches!(self, Self::Pending)
329 }
330}
331
332#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
334pub struct DecayJob {
335 pub id: DecayJobId,
337 pub kind: DecayJobKind,
339 pub state: DecayJobState,
341 pub scheduled_for: DateTime<Utc>,
343 pub created_at: DateTime<Utc>,
345 pub created_by: String,
348 pub updated_at: DateTime<Utc>,
350}
351
352#[derive(Debug)]
357pub enum DecayJobConversionError {
358 UnknownKindWire(String),
360 UnknownSummaryMethodWire(String),
362 UnknownStateWire(String),
364 SummaryMethodKindMismatch {
366 kind_wire: String,
368 summary_method_wire: String,
370 },
371 InvalidSourceIdsJson(String),
373 InvalidId(cortex_core::CoreError),
375 MissingFailedReason,
377}
378
379impl fmt::Display for DecayJobConversionError {
380 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
381 match self {
382 Self::UnknownKindWire(wire) => write!(f, "unknown decay job kind wire `{wire}`"),
383 Self::UnknownSummaryMethodWire(wire) => {
384 write!(f, "unknown decay job summary method wire `{wire}`")
385 }
386 Self::UnknownStateWire(wire) => write!(f, "unknown decay job state wire `{wire}`"),
387 Self::SummaryMethodKindMismatch {
388 kind_wire,
389 summary_method_wire,
390 } => write!(
391 f,
392 "decay job kind `{kind_wire}` cannot persist summary_method `{summary_method_wire}`"
393 ),
394 Self::InvalidSourceIdsJson(msg) => {
395 write!(f, "decay job source_ids_json is malformed: {msg}")
396 }
397 Self::InvalidId(err) => write!(f, "decay job id parse error: {err}"),
398 Self::MissingFailedReason => write!(
399 f,
400 "decay job in `failed` state must carry a non-empty state_reason"
401 ),
402 }
403 }
404}
405
406impl Error for DecayJobConversionError {
407 fn source(&self) -> Option<&(dyn Error + 'static)> {
408 match self {
409 Self::InvalidId(err) => Some(err),
410 _ => None,
411 }
412 }
413}
414
415impl From<cortex_core::CoreError> for DecayJobConversionError {
416 fn from(err: cortex_core::CoreError) -> Self {
417 Self::InvalidId(err)
418 }
419}
420
421impl From<DecayJob> for DecayJobRecord {
422 fn from(job: DecayJob) -> Self {
423 let kind_wire = job.kind.kind_wire().to_string();
424 let (source_ids_json, summary_method_wire) = match &job.kind {
425 DecayJobKind::EpisodeCompression {
426 source_episode_ids,
427 summary_method,
428 } => (
429 Value::Array(
430 source_episode_ids
431 .iter()
432 .map(|id| Value::String(id.to_string()))
433 .collect(),
434 ),
435 summary_method.method_wire().to_string(),
436 ),
437 DecayJobKind::CandidateCompression {
438 source_memory_ids,
439 summary_method,
440 } => (
441 Value::Array(
442 source_memory_ids
443 .iter()
444 .map(|id| Value::String(id.to_string()))
445 .collect(),
446 ),
447 summary_method.method_wire().to_string(),
448 ),
449 DecayJobKind::ExpiredPrincipleReview { principle_id } => (
450 Value::Array(vec![Value::String(principle_id.to_string())]),
451 SUMMARY_METHOD_NONE_WIRE.to_string(),
452 ),
453 };
454 let (state_wire, state_reason, result_memory_id) = match &job.state {
455 DecayJobState::Pending | DecayJobState::InProgress | DecayJobState::Cancelled => {
456 (job.state.state_wire().to_string(), None, None)
457 }
458 DecayJobState::Completed { result_memory_id } => {
459 (job.state.state_wire().to_string(), None, *result_memory_id)
460 }
461 DecayJobState::Failed { reason } => (
462 job.state.state_wire().to_string(),
463 Some(reason.clone()),
464 None,
465 ),
466 };
467 DecayJobRecord {
468 id: job.id,
469 kind_wire,
470 summary_method_wire,
471 source_ids_json,
472 state_wire,
473 state_reason,
474 result_memory_id,
475 scheduled_for: job.scheduled_for,
476 created_at: job.created_at,
477 created_by: job.created_by,
478 updated_at: job.updated_at,
479 }
480 }
481}
482
483impl TryFrom<DecayJobRecord> for DecayJob {
484 type Error = DecayJobConversionError;
485
486 fn try_from(record: DecayJobRecord) -> Result<Self, Self::Error> {
487 let kind = parse_kind_from_record(
488 &record.kind_wire,
489 &record.summary_method_wire,
490 &record.source_ids_json,
491 )?;
492 let state = parse_state_from_record(
493 &record.state_wire,
494 record.state_reason,
495 record.result_memory_id,
496 )?;
497 Ok(DecayJob {
498 id: record.id,
499 kind,
500 state,
501 scheduled_for: record.scheduled_for,
502 created_at: record.created_at,
503 created_by: record.created_by,
504 updated_at: record.updated_at,
505 })
506 }
507}
508
509fn parse_kind_from_record(
510 kind_wire: &str,
511 summary_method_wire: &str,
512 source_ids_json: &Value,
513) -> Result<DecayJobKind, DecayJobConversionError> {
514 let array = source_ids_json.as_array().ok_or_else(|| {
515 DecayJobConversionError::InvalidSourceIdsJson("expected JSON array".into())
516 })?;
517 let strings = array
518 .iter()
519 .map(|v| {
520 v.as_str().map(str::to_string).ok_or_else(|| {
521 DecayJobConversionError::InvalidSourceIdsJson(
522 "all source id entries must be strings".into(),
523 )
524 })
525 })
526 .collect::<Result<Vec<_>, _>>()?;
527
528 match kind_wire {
529 "episode_compression" => {
530 let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
531 let source_episode_ids = strings
532 .iter()
533 .map(|s| {
534 s.parse::<EpisodeId>()
535 .map_err(DecayJobConversionError::from)
536 })
537 .collect::<Result<Vec<_>, _>>()?;
538 Ok(DecayJobKind::EpisodeCompression {
539 source_episode_ids,
540 summary_method,
541 })
542 }
543 "candidate_compression" => {
544 let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
545 let source_memory_ids = strings
546 .iter()
547 .map(|s| s.parse::<MemoryId>().map_err(DecayJobConversionError::from))
548 .collect::<Result<Vec<_>, _>>()?;
549 Ok(DecayJobKind::CandidateCompression {
550 source_memory_ids,
551 summary_method,
552 })
553 }
554 "expired_principle_review" => {
555 if summary_method_wire != SUMMARY_METHOD_NONE_WIRE {
556 return Err(DecayJobConversionError::SummaryMethodKindMismatch {
557 kind_wire: kind_wire.to_string(),
558 summary_method_wire: summary_method_wire.to_string(),
559 });
560 }
561 let principle_id = match strings.as_slice() {
562 [single] => single
563 .parse::<PrincipleId>()
564 .map_err(DecayJobConversionError::from)?,
565 other => {
566 return Err(DecayJobConversionError::InvalidSourceIdsJson(format!(
567 "expired_principle_review expected exactly one source id, got {}",
568 other.len()
569 )))
570 }
571 };
572 Ok(DecayJobKind::ExpiredPrincipleReview { principle_id })
573 }
574 other => Err(DecayJobConversionError::UnknownKindWire(other.to_string())),
575 }
576}
577
578fn parse_summary_method(
579 method_wire: &str,
580 kind_wire: &str,
581) -> Result<SummaryMethod, DecayJobConversionError> {
582 match method_wire {
583 "deterministic_concatenate" => Ok(SummaryMethod::DeterministicConcatenate),
584 "llm_summary" => Ok(SummaryMethod::LlmSummary {
585 operator_attestation_required: true,
586 }),
587 SUMMARY_METHOD_NONE_WIRE => Err(DecayJobConversionError::SummaryMethodKindMismatch {
588 kind_wire: kind_wire.to_string(),
589 summary_method_wire: method_wire.to_string(),
590 }),
591 other => Err(DecayJobConversionError::UnknownSummaryMethodWire(
592 other.to_string(),
593 )),
594 }
595}
596
597fn parse_state_from_record(
598 state_wire: &str,
599 state_reason: Option<String>,
600 result_memory_id: Option<MemoryId>,
601) -> Result<DecayJobState, DecayJobConversionError> {
602 match state_wire {
603 "pending" => Ok(DecayJobState::Pending),
604 "in_progress" => Ok(DecayJobState::InProgress),
605 "completed" => Ok(DecayJobState::Completed { result_memory_id }),
606 "failed" => {
607 let reason = state_reason.ok_or(DecayJobConversionError::MissingFailedReason)?;
608 if reason.trim().is_empty() {
609 return Err(DecayJobConversionError::MissingFailedReason);
610 }
611 Ok(DecayJobState::Failed { reason })
612 }
613 "cancelled" => Ok(DecayJobState::Cancelled),
614 other => Err(DecayJobConversionError::UnknownStateWire(other.to_string())),
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use super::*;
621 use chrono::TimeZone;
622
623 fn ts() -> DateTime<Utc> {
624 Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
625 }
626
627 #[test]
628 fn decay_job_id_is_stable_round_trip() {
629 let id = DecayJobId::new();
630 let s = id.to_string();
631 assert!(s.starts_with("dcy_"), "decay job id prefix: {s}");
632 let back: DecayJobId = s.parse().expect("round-trip parse");
633 assert_eq!(back, id);
634
635 let bad = format!("mem_{}", id.as_ulid());
637 assert!(bad.parse::<DecayJobId>().is_err());
638 }
639
640 #[test]
641 fn decay_job_state_transitions_are_well_typed() {
642 let pending = DecayJobState::Pending;
647 assert_eq!(pending.state_wire(), "pending");
648 assert!(!pending.is_terminal());
649 assert!(pending.is_scheduling_eligible());
650
651 let in_progress = DecayJobState::InProgress;
652 assert_eq!(in_progress.state_wire(), "in_progress");
653 assert!(!in_progress.is_terminal());
654 assert!(!in_progress.is_scheduling_eligible());
655
656 let completed = DecayJobState::Completed {
657 result_memory_id: Some(MemoryId::new()),
658 };
659 assert_eq!(completed.state_wire(), "completed");
660 assert!(completed.is_terminal());
661 assert!(!completed.is_scheduling_eligible());
662
663 let completed_review = DecayJobState::Completed {
664 result_memory_id: None,
665 };
666 assert_eq!(completed_review.state_wire(), "completed");
667 assert!(completed_review.is_terminal());
668
669 let failed = DecayJobState::Failed {
670 reason: "store unavailable".into(),
671 };
672 assert_eq!(failed.state_wire(), "failed");
673 assert!(failed.is_terminal());
674 assert!(!failed.is_scheduling_eligible());
675
676 let cancelled = DecayJobState::Cancelled;
677 assert_eq!(cancelled.state_wire(), "cancelled");
678 assert!(cancelled.is_terminal());
679 assert!(!cancelled.is_scheduling_eligible());
680 }
681
682 #[test]
683 fn summary_method_serializes_with_method_discriminator() {
684 let det = SummaryMethod::DeterministicConcatenate;
685 let det_json = serde_json::to_value(&det).expect("serialize deterministic");
686 assert_eq!(
687 det_json,
688 serde_json::json!({"method": "deterministic_concatenate"}),
689 );
690 let det_back: SummaryMethod =
691 serde_json::from_value(det_json).expect("deserialize deterministic");
692 assert_eq!(det_back, det);
693
694 let llm = SummaryMethod::LlmSummary {
695 operator_attestation_required: true,
696 };
697 let llm_json = serde_json::to_value(&llm).expect("serialize llm");
698 assert_eq!(
699 llm_json,
700 serde_json::json!({
701 "method": "llm_summary",
702 "operator_attestation_required": true,
703 }),
704 );
705 let llm_back: SummaryMethod = serde_json::from_value(llm_json).expect("deserialize llm");
706 assert_eq!(llm_back, llm);
707
708 assert_eq!(det.method_wire(), "deterministic_concatenate");
709 assert_eq!(llm.method_wire(), "llm_summary");
710 }
711
712 #[test]
713 fn decay_job_kind_wire_matches_migration_alphabet() {
714 let episode = DecayJobKind::EpisodeCompression {
715 source_episode_ids: vec![EpisodeId::new()],
716 summary_method: SummaryMethod::DeterministicConcatenate,
717 };
718 assert_eq!(episode.kind_wire(), "episode_compression");
719 assert!(episode.summary_method().is_some());
720
721 let candidate = DecayJobKind::CandidateCompression {
722 source_memory_ids: vec![MemoryId::new()],
723 summary_method: SummaryMethod::LlmSummary {
724 operator_attestation_required: true,
725 },
726 };
727 assert_eq!(candidate.kind_wire(), "candidate_compression");
728 assert!(candidate.summary_method().is_some());
729
730 let review = DecayJobKind::ExpiredPrincipleReview {
731 principle_id: PrincipleId::new(),
732 };
733 assert_eq!(review.kind_wire(), "expired_principle_review");
734 assert!(review.summary_method().is_none());
735 }
736
737 #[test]
738 fn decay_job_round_trips_through_json() {
739 let job = DecayJob {
740 id: DecayJobId::new(),
741 kind: DecayJobKind::CandidateCompression {
742 source_memory_ids: vec![MemoryId::new(), MemoryId::new()],
743 summary_method: SummaryMethod::DeterministicConcatenate,
744 },
745 state: DecayJobState::Pending,
746 scheduled_for: ts(),
747 created_at: ts(),
748 created_by: "operator:test".into(),
749 updated_at: ts(),
750 };
751 let bytes = serde_json::to_vec(&job).expect("serialize job");
752 let back: DecayJob = serde_json::from_slice(&bytes).expect("deserialize job");
753 assert_eq!(back, job);
754 }
755
756 #[test]
757 fn decay_job_round_trips_through_persistence_record() {
758 let job = DecayJob {
760 id: DecayJobId::new(),
761 kind: DecayJobKind::EpisodeCompression {
762 source_episode_ids: vec![EpisodeId::new(), EpisodeId::new()],
763 summary_method: SummaryMethod::DeterministicConcatenate,
764 },
765 state: DecayJobState::Pending,
766 scheduled_for: ts(),
767 created_at: ts(),
768 created_by: "operator:test".into(),
769 updated_at: ts(),
770 };
771 let record: DecayJobRecord = job.clone().into();
772 assert_eq!(record.kind_wire, "episode_compression");
773 assert_eq!(record.summary_method_wire, "deterministic_concatenate");
774 assert_eq!(record.state_wire, "pending");
775 let back: DecayJob = record.try_into().expect("record -> job");
776 assert_eq!(back, job);
777
778 let memory = MemoryId::new();
780 let job = DecayJob {
781 id: DecayJobId::new(),
782 kind: DecayJobKind::CandidateCompression {
783 source_memory_ids: vec![MemoryId::new()],
784 summary_method: SummaryMethod::LlmSummary {
785 operator_attestation_required: true,
786 },
787 },
788 state: DecayJobState::Completed {
789 result_memory_id: Some(memory),
790 },
791 scheduled_for: ts(),
792 created_at: ts(),
793 created_by: "operator:test".into(),
794 updated_at: ts(),
795 };
796 let record: DecayJobRecord = job.clone().into();
797 assert_eq!(record.summary_method_wire, "llm_summary");
798 assert_eq!(record.state_wire, "completed");
799 assert_eq!(record.result_memory_id, Some(memory));
800 let back: DecayJob = record.try_into().expect("record -> job");
801 assert_eq!(back, job);
802
803 let job = DecayJob {
805 id: DecayJobId::new(),
806 kind: DecayJobKind::ExpiredPrincipleReview {
807 principle_id: PrincipleId::new(),
808 },
809 state: DecayJobState::Failed {
810 reason: "operator absent".into(),
811 },
812 scheduled_for: ts(),
813 created_at: ts(),
814 created_by: "operator:test".into(),
815 updated_at: ts(),
816 };
817 let record: DecayJobRecord = job.clone().into();
818 assert_eq!(record.kind_wire, "expired_principle_review");
819 assert_eq!(record.summary_method_wire, SUMMARY_METHOD_NONE_WIRE);
820 assert_eq!(record.state_wire, "failed");
821 assert_eq!(record.state_reason.as_deref(), Some("operator absent"));
822 let back: DecayJob = record.try_into().expect("record -> job");
823 assert_eq!(back, job);
824 }
825
826 #[test]
827 fn decay_job_record_rejects_kind_summary_method_mismatch() {
828 let record = DecayJobRecord {
829 id: DecayJobId::new(),
830 kind_wire: "expired_principle_review".into(),
831 summary_method_wire: "deterministic_concatenate".into(),
832 source_ids_json: serde_json::json!([PrincipleId::new().to_string()]),
833 state_wire: "pending".into(),
834 state_reason: None,
835 result_memory_id: None,
836 scheduled_for: ts(),
837 created_at: ts(),
838 created_by: "operator:test".into(),
839 updated_at: ts(),
840 };
841 let err = DecayJob::try_from(record).expect_err("kind/method mismatch must fail");
842 assert!(matches!(
843 err,
844 DecayJobConversionError::SummaryMethodKindMismatch { .. }
845 ));
846 }
847}