pub mod compress;
pub mod runner;
pub mod summary;
use std::error::Error;
use std::fmt;
use chrono::{DateTime, Utc};
use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
use cortex_store::repo::DecayJobRecord;
use cortex_store::StoreError;
use serde::{Deserialize, Serialize};
use serde_json::Value;
pub type DecayResult<T> = Result<T, DecayError>;
#[derive(Debug)]
pub enum DecayError {
Store(StoreError),
Validation(String),
LlmSummaryRequiresOperatorAttestation,
LlmSummaryAttestationRejected(String),
LlmSummaryBackendCallFailed(String),
}
pub const DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT: &str =
"decay.llm_summary.requires_operator_attestation";
pub const DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT: &str =
"decay.llm_summary.attestation_rejected";
pub const DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT: &str =
"decay.llm_summary.backend_call_failed";
pub const DECAY_COMPRESS_INPUT_INVALID_INVARIANT: &str = "decay.compress.input_invalid";
pub const DECAY_COMPRESS_SOURCE_MISSING_INVARIANT: &str = "decay.compress.source_missing";
impl DecayError {
#[must_use]
pub fn invariant(&self) -> Option<&'static str> {
match self {
Self::LlmSummaryRequiresOperatorAttestation => {
Some(DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
}
Self::LlmSummaryAttestationRejected(_) => {
Some(DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
}
Self::LlmSummaryBackendCallFailed(_) => {
Some(DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT)
}
_ => None,
}
}
}
impl fmt::Display for DecayError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Store(err) => write!(f, "store error: {err}"),
Self::Validation(message) => write!(f, "validation failed: {message}"),
Self::LlmSummaryRequiresOperatorAttestation => write!(
f,
"invariant={DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT} LLM summary refused: operator attestation is required (operator-fired only)"
),
Self::LlmSummaryAttestationRejected(detail) => write!(
f,
"invariant={DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT} LLM summary attestation rejected: {detail}"
),
Self::LlmSummaryBackendCallFailed(detail) => write!(
f,
"invariant={DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT} LLM summary backend call failed: {detail}"
),
}
}
}
impl Error for DecayError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::Store(err) => Some(err),
_ => None,
}
}
}
impl From<StoreError> for DecayError {
fn from(err: StoreError) -> Self {
Self::Store(err)
}
}
pub const DECAY_SUMMARY_MAX_CLAIM_BYTES: usize = 4096;
pub const DECAY_SUMMARY_CLAIM_SEPARATOR: &str = " | ";
pub const DECAY_SUMMARY_TRUNCATION_SUFFIX: &str = "... [truncated]";
pub const DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION: u16 = 1;
pub const DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE: &str = "cortex.decay.llm_summary";
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum DecayJobKind {
EpisodeCompression {
source_episode_ids: Vec<EpisodeId>,
summary_method: SummaryMethod,
},
CandidateCompression {
source_memory_ids: Vec<MemoryId>,
summary_method: SummaryMethod,
},
ExpiredPrincipleReview {
principle_id: PrincipleId,
},
}
impl DecayJobKind {
#[must_use]
pub const fn kind_wire(&self) -> &'static str {
match self {
Self::EpisodeCompression { .. } => "episode_compression",
Self::CandidateCompression { .. } => "candidate_compression",
Self::ExpiredPrincipleReview { .. } => "expired_principle_review",
}
}
#[must_use]
pub fn summary_method(&self) -> Option<&SummaryMethod> {
match self {
Self::EpisodeCompression { summary_method, .. }
| Self::CandidateCompression { summary_method, .. } => Some(summary_method),
Self::ExpiredPrincipleReview { .. } => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "snake_case")]
pub enum SummaryMethod {
DeterministicConcatenate,
LlmSummary {
operator_attestation_required: bool,
},
}
impl SummaryMethod {
#[must_use]
pub const fn method_wire(&self) -> &'static str {
match self {
Self::DeterministicConcatenate => "deterministic_concatenate",
Self::LlmSummary { .. } => "llm_summary",
}
}
}
pub use cortex_store::repo::SUMMARY_METHOD_NONE_WIRE;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum DecayJobState {
Pending,
InProgress,
Completed {
result_memory_id: Option<MemoryId>,
},
Failed {
reason: String,
},
Cancelled,
}
impl DecayJobState {
#[must_use]
pub const fn state_wire(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::InProgress => "in_progress",
Self::Completed { .. } => "completed",
Self::Failed { .. } => "failed",
Self::Cancelled => "cancelled",
}
}
#[must_use]
pub const fn is_terminal(&self) -> bool {
matches!(
self,
Self::Completed { .. } | Self::Failed { .. } | Self::Cancelled
)
}
#[must_use]
pub const fn is_scheduling_eligible(&self) -> bool {
matches!(self, Self::Pending)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DecayJob {
pub id: DecayJobId,
pub kind: DecayJobKind,
pub state: DecayJobState,
pub scheduled_for: DateTime<Utc>,
pub created_at: DateTime<Utc>,
pub created_by: String,
pub updated_at: DateTime<Utc>,
}
#[derive(Debug)]
pub enum DecayJobConversionError {
UnknownKindWire(String),
UnknownSummaryMethodWire(String),
UnknownStateWire(String),
SummaryMethodKindMismatch {
kind_wire: String,
summary_method_wire: String,
},
InvalidSourceIdsJson(String),
InvalidId(cortex_core::CoreError),
MissingFailedReason,
}
impl fmt::Display for DecayJobConversionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::UnknownKindWire(wire) => write!(f, "unknown decay job kind wire `{wire}`"),
Self::UnknownSummaryMethodWire(wire) => {
write!(f, "unknown decay job summary method wire `{wire}`")
}
Self::UnknownStateWire(wire) => write!(f, "unknown decay job state wire `{wire}`"),
Self::SummaryMethodKindMismatch {
kind_wire,
summary_method_wire,
} => write!(
f,
"decay job kind `{kind_wire}` cannot persist summary_method `{summary_method_wire}`"
),
Self::InvalidSourceIdsJson(msg) => {
write!(f, "decay job source_ids_json is malformed: {msg}")
}
Self::InvalidId(err) => write!(f, "decay job id parse error: {err}"),
Self::MissingFailedReason => write!(
f,
"decay job in `failed` state must carry a non-empty state_reason"
),
}
}
}
impl Error for DecayJobConversionError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
Self::InvalidId(err) => Some(err),
_ => None,
}
}
}
impl From<cortex_core::CoreError> for DecayJobConversionError {
fn from(err: cortex_core::CoreError) -> Self {
Self::InvalidId(err)
}
}
impl From<DecayJob> for DecayJobRecord {
fn from(job: DecayJob) -> Self {
let kind_wire = job.kind.kind_wire().to_string();
let (source_ids_json, summary_method_wire) = match &job.kind {
DecayJobKind::EpisodeCompression {
source_episode_ids,
summary_method,
} => (
Value::Array(
source_episode_ids
.iter()
.map(|id| Value::String(id.to_string()))
.collect(),
),
summary_method.method_wire().to_string(),
),
DecayJobKind::CandidateCompression {
source_memory_ids,
summary_method,
} => (
Value::Array(
source_memory_ids
.iter()
.map(|id| Value::String(id.to_string()))
.collect(),
),
summary_method.method_wire().to_string(),
),
DecayJobKind::ExpiredPrincipleReview { principle_id } => (
Value::Array(vec![Value::String(principle_id.to_string())]),
SUMMARY_METHOD_NONE_WIRE.to_string(),
),
};
let (state_wire, state_reason, result_memory_id) = match &job.state {
DecayJobState::Pending | DecayJobState::InProgress | DecayJobState::Cancelled => {
(job.state.state_wire().to_string(), None, None)
}
DecayJobState::Completed { result_memory_id } => {
(job.state.state_wire().to_string(), None, *result_memory_id)
}
DecayJobState::Failed { reason } => (
job.state.state_wire().to_string(),
Some(reason.clone()),
None,
),
};
DecayJobRecord {
id: job.id,
kind_wire,
summary_method_wire,
source_ids_json,
state_wire,
state_reason,
result_memory_id,
scheduled_for: job.scheduled_for,
created_at: job.created_at,
created_by: job.created_by,
updated_at: job.updated_at,
}
}
}
impl TryFrom<DecayJobRecord> for DecayJob {
type Error = DecayJobConversionError;
fn try_from(record: DecayJobRecord) -> Result<Self, Self::Error> {
let kind = parse_kind_from_record(
&record.kind_wire,
&record.summary_method_wire,
&record.source_ids_json,
)?;
let state = parse_state_from_record(
&record.state_wire,
record.state_reason,
record.result_memory_id,
)?;
Ok(DecayJob {
id: record.id,
kind,
state,
scheduled_for: record.scheduled_for,
created_at: record.created_at,
created_by: record.created_by,
updated_at: record.updated_at,
})
}
}
fn parse_kind_from_record(
kind_wire: &str,
summary_method_wire: &str,
source_ids_json: &Value,
) -> Result<DecayJobKind, DecayJobConversionError> {
let array = source_ids_json.as_array().ok_or_else(|| {
DecayJobConversionError::InvalidSourceIdsJson("expected JSON array".into())
})?;
let strings = array
.iter()
.map(|v| {
v.as_str().map(str::to_string).ok_or_else(|| {
DecayJobConversionError::InvalidSourceIdsJson(
"all source id entries must be strings".into(),
)
})
})
.collect::<Result<Vec<_>, _>>()?;
match kind_wire {
"episode_compression" => {
let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
let source_episode_ids = strings
.iter()
.map(|s| {
s.parse::<EpisodeId>()
.map_err(DecayJobConversionError::from)
})
.collect::<Result<Vec<_>, _>>()?;
Ok(DecayJobKind::EpisodeCompression {
source_episode_ids,
summary_method,
})
}
"candidate_compression" => {
let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
let source_memory_ids = strings
.iter()
.map(|s| s.parse::<MemoryId>().map_err(DecayJobConversionError::from))
.collect::<Result<Vec<_>, _>>()?;
Ok(DecayJobKind::CandidateCompression {
source_memory_ids,
summary_method,
})
}
"expired_principle_review" => {
if summary_method_wire != SUMMARY_METHOD_NONE_WIRE {
return Err(DecayJobConversionError::SummaryMethodKindMismatch {
kind_wire: kind_wire.to_string(),
summary_method_wire: summary_method_wire.to_string(),
});
}
let principle_id = match strings.as_slice() {
[single] => single
.parse::<PrincipleId>()
.map_err(DecayJobConversionError::from)?,
other => {
return Err(DecayJobConversionError::InvalidSourceIdsJson(format!(
"expired_principle_review expected exactly one source id, got {}",
other.len()
)))
}
};
Ok(DecayJobKind::ExpiredPrincipleReview { principle_id })
}
other => Err(DecayJobConversionError::UnknownKindWire(other.to_string())),
}
}
fn parse_summary_method(
method_wire: &str,
kind_wire: &str,
) -> Result<SummaryMethod, DecayJobConversionError> {
match method_wire {
"deterministic_concatenate" => Ok(SummaryMethod::DeterministicConcatenate),
"llm_summary" => Ok(SummaryMethod::LlmSummary {
operator_attestation_required: true,
}),
SUMMARY_METHOD_NONE_WIRE => Err(DecayJobConversionError::SummaryMethodKindMismatch {
kind_wire: kind_wire.to_string(),
summary_method_wire: method_wire.to_string(),
}),
other => Err(DecayJobConversionError::UnknownSummaryMethodWire(
other.to_string(),
)),
}
}
fn parse_state_from_record(
state_wire: &str,
state_reason: Option<String>,
result_memory_id: Option<MemoryId>,
) -> Result<DecayJobState, DecayJobConversionError> {
match state_wire {
"pending" => Ok(DecayJobState::Pending),
"in_progress" => Ok(DecayJobState::InProgress),
"completed" => Ok(DecayJobState::Completed { result_memory_id }),
"failed" => {
let reason = state_reason.ok_or(DecayJobConversionError::MissingFailedReason)?;
if reason.trim().is_empty() {
return Err(DecayJobConversionError::MissingFailedReason);
}
Ok(DecayJobState::Failed { reason })
}
"cancelled" => Ok(DecayJobState::Cancelled),
other => Err(DecayJobConversionError::UnknownStateWire(other.to_string())),
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::TimeZone;
fn ts() -> DateTime<Utc> {
Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
}
#[test]
fn decay_job_id_is_stable_round_trip() {
let id = DecayJobId::new();
let s = id.to_string();
assert!(s.starts_with("dcy_"), "decay job id prefix: {s}");
let back: DecayJobId = s.parse().expect("round-trip parse");
assert_eq!(back, id);
let bad = format!("mem_{}", id.as_ulid());
assert!(bad.parse::<DecayJobId>().is_err());
}
#[test]
fn decay_job_state_transitions_are_well_typed() {
let pending = DecayJobState::Pending;
assert_eq!(pending.state_wire(), "pending");
assert!(!pending.is_terminal());
assert!(pending.is_scheduling_eligible());
let in_progress = DecayJobState::InProgress;
assert_eq!(in_progress.state_wire(), "in_progress");
assert!(!in_progress.is_terminal());
assert!(!in_progress.is_scheduling_eligible());
let completed = DecayJobState::Completed {
result_memory_id: Some(MemoryId::new()),
};
assert_eq!(completed.state_wire(), "completed");
assert!(completed.is_terminal());
assert!(!completed.is_scheduling_eligible());
let completed_review = DecayJobState::Completed {
result_memory_id: None,
};
assert_eq!(completed_review.state_wire(), "completed");
assert!(completed_review.is_terminal());
let failed = DecayJobState::Failed {
reason: "store unavailable".into(),
};
assert_eq!(failed.state_wire(), "failed");
assert!(failed.is_terminal());
assert!(!failed.is_scheduling_eligible());
let cancelled = DecayJobState::Cancelled;
assert_eq!(cancelled.state_wire(), "cancelled");
assert!(cancelled.is_terminal());
assert!(!cancelled.is_scheduling_eligible());
}
#[test]
fn summary_method_serializes_with_method_discriminator() {
let det = SummaryMethod::DeterministicConcatenate;
let det_json = serde_json::to_value(&det).expect("serialize deterministic");
assert_eq!(
det_json,
serde_json::json!({"method": "deterministic_concatenate"}),
);
let det_back: SummaryMethod =
serde_json::from_value(det_json).expect("deserialize deterministic");
assert_eq!(det_back, det);
let llm = SummaryMethod::LlmSummary {
operator_attestation_required: true,
};
let llm_json = serde_json::to_value(&llm).expect("serialize llm");
assert_eq!(
llm_json,
serde_json::json!({
"method": "llm_summary",
"operator_attestation_required": true,
}),
);
let llm_back: SummaryMethod = serde_json::from_value(llm_json).expect("deserialize llm");
assert_eq!(llm_back, llm);
assert_eq!(det.method_wire(), "deterministic_concatenate");
assert_eq!(llm.method_wire(), "llm_summary");
}
#[test]
fn decay_job_kind_wire_matches_migration_alphabet() {
let episode = DecayJobKind::EpisodeCompression {
source_episode_ids: vec![EpisodeId::new()],
summary_method: SummaryMethod::DeterministicConcatenate,
};
assert_eq!(episode.kind_wire(), "episode_compression");
assert!(episode.summary_method().is_some());
let candidate = DecayJobKind::CandidateCompression {
source_memory_ids: vec![MemoryId::new()],
summary_method: SummaryMethod::LlmSummary {
operator_attestation_required: true,
},
};
assert_eq!(candidate.kind_wire(), "candidate_compression");
assert!(candidate.summary_method().is_some());
let review = DecayJobKind::ExpiredPrincipleReview {
principle_id: PrincipleId::new(),
};
assert_eq!(review.kind_wire(), "expired_principle_review");
assert!(review.summary_method().is_none());
}
#[test]
fn decay_job_round_trips_through_json() {
let job = DecayJob {
id: DecayJobId::new(),
kind: DecayJobKind::CandidateCompression {
source_memory_ids: vec![MemoryId::new(), MemoryId::new()],
summary_method: SummaryMethod::DeterministicConcatenate,
},
state: DecayJobState::Pending,
scheduled_for: ts(),
created_at: ts(),
created_by: "operator:test".into(),
updated_at: ts(),
};
let bytes = serde_json::to_vec(&job).expect("serialize job");
let back: DecayJob = serde_json::from_slice(&bytes).expect("deserialize job");
assert_eq!(back, job);
}
#[test]
fn decay_job_round_trips_through_persistence_record() {
let job = DecayJob {
id: DecayJobId::new(),
kind: DecayJobKind::EpisodeCompression {
source_episode_ids: vec![EpisodeId::new(), EpisodeId::new()],
summary_method: SummaryMethod::DeterministicConcatenate,
},
state: DecayJobState::Pending,
scheduled_for: ts(),
created_at: ts(),
created_by: "operator:test".into(),
updated_at: ts(),
};
let record: DecayJobRecord = job.clone().into();
assert_eq!(record.kind_wire, "episode_compression");
assert_eq!(record.summary_method_wire, "deterministic_concatenate");
assert_eq!(record.state_wire, "pending");
let back: DecayJob = record.try_into().expect("record -> job");
assert_eq!(back, job);
let memory = MemoryId::new();
let job = DecayJob {
id: DecayJobId::new(),
kind: DecayJobKind::CandidateCompression {
source_memory_ids: vec![MemoryId::new()],
summary_method: SummaryMethod::LlmSummary {
operator_attestation_required: true,
},
},
state: DecayJobState::Completed {
result_memory_id: Some(memory),
},
scheduled_for: ts(),
created_at: ts(),
created_by: "operator:test".into(),
updated_at: ts(),
};
let record: DecayJobRecord = job.clone().into();
assert_eq!(record.summary_method_wire, "llm_summary");
assert_eq!(record.state_wire, "completed");
assert_eq!(record.result_memory_id, Some(memory));
let back: DecayJob = record.try_into().expect("record -> job");
assert_eq!(back, job);
let job = DecayJob {
id: DecayJobId::new(),
kind: DecayJobKind::ExpiredPrincipleReview {
principle_id: PrincipleId::new(),
},
state: DecayJobState::Failed {
reason: "operator absent".into(),
},
scheduled_for: ts(),
created_at: ts(),
created_by: "operator:test".into(),
updated_at: ts(),
};
let record: DecayJobRecord = job.clone().into();
assert_eq!(record.kind_wire, "expired_principle_review");
assert_eq!(record.summary_method_wire, SUMMARY_METHOD_NONE_WIRE);
assert_eq!(record.state_wire, "failed");
assert_eq!(record.state_reason.as_deref(), Some("operator absent"));
let back: DecayJob = record.try_into().expect("record -> job");
assert_eq!(back, job);
}
#[test]
fn decay_job_record_rejects_kind_summary_method_mismatch() {
let record = DecayJobRecord {
id: DecayJobId::new(),
kind_wire: "expired_principle_review".into(),
summary_method_wire: "deterministic_concatenate".into(),
source_ids_json: serde_json::json!([PrincipleId::new().to_string()]),
state_wire: "pending".into(),
state_reason: None,
result_memory_id: None,
scheduled_for: ts(),
created_at: ts(),
created_by: "operator:test".into(),
updated_at: ts(),
};
let err = DecayJob::try_from(record).expect_err("kind/method mismatch must fail");
assert!(matches!(
err,
DecayJobConversionError::SummaryMethodKindMismatch { .. }
));
}
}