Skip to main content

cortex_memory/decay/
mod.rs

1//! Phase 4.D decay job substrate.
2//!
3//! Decay jobs are operator-fired scheduled tasks that compress or summarize
4//! OLD memory rows (and operator-driven re-promotion reviews of expired
5//! principle waivers). They do NOT touch the immutable, hash-chained event
6//! log: the Phase 4.D guardrail is that raw events and the hash chain remain
7//! unchanged. Compression operates one level up, on the semantic memory
8//! surface and on candidate principles whose policy waiver has expired.
9//!
10//! Provenance is preserved: every summary memory landed by a decay job
11//! retains pointers back to its source memory ids and (transitively) the
12//! source event ids those memories already carry, via the same
13//! `source_episodes_json` / `source_events_json` fields existing memory
14//! candidates use. A summary is therefore a re-shaping of *already-attested*
15//! content rather than a new claim with weaker lineage.
16//!
17//! This module owns the **types** (state machine, kind discriminator, summary
18//! method enum) and the conversion shims used by the persistence layer in
19//! `cortex-store::repo::decay_jobs`. It does NOT own:
20//!
21//! - The actual compression / summarization implementation (Phase 4.D D3-B).
22//! - The CLI surface that operators use to fire jobs (Phase 4.D D3-C).
23//! - Any retrieval-side rewiring (Phase 4.D2).
24//!
25//! Downstream agents land on the types defined here. The wire forms produced
26//! by [`DecayJobKind::kind_wire`], [`SummaryMethod::method_wire`], and
27//! [`DecayJobState::state_wire`] are part of the persistence contract — they
28//! match the `CHECK` constraints in `migrations/008_decay_jobs.sql`.
29
30pub mod compress;
31pub mod runner;
32pub mod summary;
33
34use std::error::Error;
35use std::fmt;
36
37use chrono::{DateTime, Utc};
38use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
39use cortex_store::repo::DecayJobRecord;
40use cortex_store::StoreError;
41use serde::{Deserialize, Serialize};
42use serde_json::Value;
43
44/// Result alias for the decay module.
45pub type DecayResult<T> = Result<T, DecayError>;
46
47/// Errors raised by the Phase 4.D decay path.
48#[derive(Debug)]
49pub enum DecayError {
50    /// Underlying store boundary failed.
51    Store(StoreError),
52    /// Caller supplied an invalid job input (empty source set, mixed id
53    /// space, source not found, etc.).
54    Validation(String),
55    /// LLM summary refused: no operator attestation supplied.
56    LlmSummaryRequiresOperatorAttestation,
57    /// LLM summary attestation was malformed / failed verification.
58    LlmSummaryAttestationRejected(String),
59    /// LLM summary refused: the configured [`cortex_llm::SummaryBackend`]
60    /// rejected the call (model not on allowlist, prompt template
61    /// digest mismatch, upstream call failed, output validation failed,
62    /// or the noop default backend is wired). The string carries the
63    /// backend's typed reason for grep-friendly operator scripts.
64    LlmSummaryBackendCallFailed(String),
65}
66
67/// Stable invariant surfaced by [`DecayError::LlmSummaryRequiresOperatorAttestation`].
68pub const DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT: &str =
69    "decay.llm_summary.requires_operator_attestation";
70
71/// Stable invariant surfaced by [`DecayError::LlmSummaryAttestationRejected`].
72pub const DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT: &str =
73    "decay.llm_summary.attestation_rejected";
74
75/// Stable invariant surfaced by [`DecayError::LlmSummaryBackendCallFailed`].
76/// Operator scripts that grep for this token can distinguish a
77/// backend-side refusal from an attestation-shape refusal.
78pub const DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT: &str =
79    "decay.llm_summary.backend_call_failed";
80
81/// Stable invariant surfaced when the runner refuses to compress an empty
82/// source set or a heterogeneous (mixed memory + episode) source set.
83pub const DECAY_COMPRESS_INPUT_INVALID_INVARIANT: &str = "decay.compress.input_invalid";
84
85/// Stable invariant surfaced when the runner cannot find one of the source
86/// rows named by a decay job.
87pub const DECAY_COMPRESS_SOURCE_MISSING_INVARIANT: &str = "decay.compress.source_missing";
88
89impl DecayError {
90    /// Stable invariant name for this error variant when one is defined.
91    #[must_use]
92    pub fn invariant(&self) -> Option<&'static str> {
93        match self {
94            Self::LlmSummaryRequiresOperatorAttestation => {
95                Some(DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
96            }
97            Self::LlmSummaryAttestationRejected(_) => {
98                Some(DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
99            }
100            Self::LlmSummaryBackendCallFailed(_) => {
101                Some(DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT)
102            }
103            _ => None,
104        }
105    }
106}
107
108impl fmt::Display for DecayError {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        match self {
111            Self::Store(err) => write!(f, "store error: {err}"),
112            Self::Validation(message) => write!(f, "validation failed: {message}"),
113            Self::LlmSummaryRequiresOperatorAttestation => write!(
114                f,
115                "invariant={DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT} LLM summary refused: operator attestation is required (operator-fired only)"
116            ),
117            Self::LlmSummaryAttestationRejected(detail) => write!(
118                f,
119                "invariant={DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT} LLM summary attestation rejected: {detail}"
120            ),
121            Self::LlmSummaryBackendCallFailed(detail) => write!(
122                f,
123                "invariant={DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT} LLM summary backend call failed: {detail}"
124            ),
125        }
126    }
127}
128
129impl Error for DecayError {
130    fn source(&self) -> Option<&(dyn Error + 'static)> {
131        match self {
132            Self::Store(err) => Some(err),
133            _ => None,
134        }
135    }
136}
137
138impl From<StoreError> for DecayError {
139    fn from(err: StoreError) -> Self {
140        Self::Store(err)
141    }
142}
143
144/// Maximum byte length of a deterministic-concatenated summary claim before
145/// the compressor truncates with the standard ellipsis suffix. Picked to
146/// stay comfortably below SQLite's text column / FTS5 tokeniser practical
147/// limits while remaining useful as a summary surface. Stable across runs.
148pub const DECAY_SUMMARY_MAX_CLAIM_BYTES: usize = 4096;
149
150/// Separator inserted between source claims during deterministic
151/// concatenation. Stable across runs.
152pub const DECAY_SUMMARY_CLAIM_SEPARATOR: &str = " | ";
153
154/// Suffix appended when the concatenated claim is truncated to
155/// [`DECAY_SUMMARY_MAX_CLAIM_BYTES`].
156pub const DECAY_SUMMARY_TRUNCATION_SUFFIX: &str = "... [truncated]";
157
158/// Schema version of the operator attestation envelope accepted by the
159/// LLM-summary surface. Today it mirrors the migration-attestation
160/// envelope's `schema_version == 1`; the LLM-summary purpose discriminator
161/// keeps the two domains structurally disjoint.
162pub const DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION: u16 = 1;
163
164/// Purpose discriminator baked into the LLM-summary operator attestation
165/// envelope so a captured envelope cannot be replayed against the
166/// migration-attestation surface (or vice versa).
167pub const DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE: &str = "cortex.decay.llm_summary";
168
169/// Kinds of decay operations the substrate currently supports.
170///
171/// The discriminator names returned by [`DecayJobKind::kind_wire`] are part
172/// of the SQLite `CHECK (kind IN ('episode_compression', ...))` constraint
173/// and MUST stay in sync with the migration.
174#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
175#[serde(tag = "kind", rename_all = "snake_case")]
176pub enum DecayJobKind {
177    /// Compress N old episodes into a single summary memory. Raw events stay
178    /// untouched; the resulting memory points at the source episode ids.
179    EpisodeCompression {
180        /// Source episodes whose interpreted content will be summarized.
181        source_episode_ids: Vec<EpisodeId>,
182        /// How the summary text is produced.
183        summary_method: SummaryMethod,
184    },
185    /// Compress N old candidate memories into a single summary candidate.
186    CandidateCompression {
187        /// Source memories whose claim text will be summarized.
188        source_memory_ids: Vec<MemoryId>,
189        /// How the summary text is produced.
190        summary_method: SummaryMethod,
191    },
192    /// Operator-fired re-promotion review of an expired principle waiver.
193    /// Reuses the principle promotion ceremony; the decay job is just the
194    /// scheduling vehicle so the review surfaces on a calendar instead of
195    /// requiring an out-of-band ticket.
196    ExpiredPrincipleReview {
197        /// Principle whose waiver has expired and which now needs review.
198        principle_id: PrincipleId,
199    },
200}
201
202impl DecayJobKind {
203    /// SQLite wire discriminator for the `kind` column.
204    #[must_use]
205    pub const fn kind_wire(&self) -> &'static str {
206        match self {
207            Self::EpisodeCompression { .. } => "episode_compression",
208            Self::CandidateCompression { .. } => "candidate_compression",
209            Self::ExpiredPrincipleReview { .. } => "expired_principle_review",
210        }
211    }
212
213    /// The summary method this kind carries, if it carries one. Operator
214    /// review jobs do not compress text and therefore do not carry one.
215    #[must_use]
216    pub fn summary_method(&self) -> Option<&SummaryMethod> {
217        match self {
218            Self::EpisodeCompression { summary_method, .. }
219            | Self::CandidateCompression { summary_method, .. } => Some(summary_method),
220            Self::ExpiredPrincipleReview { .. } => None,
221        }
222    }
223}
224
225/// How a compression decay job produces its summary text.
226///
227/// Serialized with an explicit `method` discriminator so deterministic and
228/// LLM-backed variants are distinguishable on the wire. The default
229/// representation chosen for tests is enforced by
230/// [`tests::summary_method_serializes_with_method_discriminator`].
231#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
232#[serde(tag = "method", rename_all = "snake_case")]
233pub enum SummaryMethod {
234    /// Deterministic concatenation + truncation. No LLM call. The original
235    /// content is not lost: provenance ids still point at the originals, so
236    /// any consumer can re-derive richer summaries later.
237    DeterministicConcatenate,
238    /// Operator-fired LLM summarization. Requires an explicit human review
239    /// step before the summary memory lands. The boolean is part of the
240    /// type, not a runtime config, so the gate cannot be silently dropped.
241    LlmSummary {
242        /// Whether an operator attestation must be supplied before the
243        /// resulting summary memory becomes durable. The substrate exists
244        /// only at `true` today; the field is kept as a `bool` so a future
245        /// non-operator-attested path (e.g. fully sandboxed quarantine
246        /// summarization) is representable without a wire break.
247        operator_attestation_required: bool,
248    },
249}
250
251impl SummaryMethod {
252    /// SQLite wire discriminator for the `summary_method` column.
253    ///
254    /// `ExpiredPrincipleReview` jobs have no summary text and persist
255    /// `"none"` in the column — that token is reserved here so the wire
256    /// alphabet is closed.
257    #[must_use]
258    pub const fn method_wire(&self) -> &'static str {
259        match self {
260            Self::DeterministicConcatenate => "deterministic_concatenate",
261            Self::LlmSummary { .. } => "llm_summary",
262        }
263    }
264}
265
266/// Sentinel wire token used in the `summary_method` column when the job kind
267/// does not carry a summary method (currently only `ExpiredPrincipleReview`).
268///
269/// Re-exported from `cortex_store::repo::SUMMARY_METHOD_NONE_WIRE` so the
270/// types module and the persistence module agree on a single source of
271/// truth.
272pub use cortex_store::repo::SUMMARY_METHOD_NONE_WIRE;
273
274/// Lifecycle state of a decay job. The terminal states (`Completed`,
275/// `Failed`, `Cancelled`) are observable but no longer subject to scheduling.
276///
277/// The discriminator returned by [`DecayJobState::state_wire`] matches the
278/// SQLite `CHECK (state IN ('pending', ...))` constraint.
279#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
280#[serde(tag = "state", rename_all = "snake_case")]
281pub enum DecayJobState {
282    /// Scheduled but not yet picked up by the worker.
283    Pending,
284    /// Worker has claimed the job and is processing it.
285    InProgress,
286    /// Worker finished successfully. `result_memory_id` is set when the
287    /// kind produced a durable summary memory; it is `None` for
288    /// `ExpiredPrincipleReview` jobs that only opened a ceremony.
289    Completed {
290        /// Identifier of the summary memory the job produced, if any.
291        result_memory_id: Option<MemoryId>,
292    },
293    /// Worker observed a refusal or store error. `reason` is the operator-
294    /// visible explanation; it MUST NOT be empty.
295    Failed {
296        /// Operator-visible failure explanation.
297        reason: String,
298    },
299    /// Operator cancelled the job before it ran (or before it completed).
300    Cancelled,
301}
302
303impl DecayJobState {
304    /// SQLite wire discriminator for the `state` column.
305    #[must_use]
306    pub const fn state_wire(&self) -> &'static str {
307        match self {
308            Self::Pending => "pending",
309            Self::InProgress => "in_progress",
310            Self::Completed { .. } => "completed",
311            Self::Failed { .. } => "failed",
312            Self::Cancelled => "cancelled",
313        }
314    }
315
316    /// True if the state is terminal (no further transitions are expected).
317    #[must_use]
318    pub const fn is_terminal(&self) -> bool {
319        matches!(
320            self,
321            Self::Completed { .. } | Self::Failed { .. } | Self::Cancelled
322        )
323    }
324
325    /// True if the state is eligible to be picked up by a worker.
326    #[must_use]
327    pub const fn is_scheduling_eligible(&self) -> bool {
328        matches!(self, Self::Pending)
329    }
330}
331
332/// Durable shape of a scheduled decay job.
333#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
334pub struct DecayJob {
335    /// Stable decay job identifier.
336    pub id: DecayJobId,
337    /// What this job does.
338    pub kind: DecayJobKind,
339    /// Current lifecycle state.
340    pub state: DecayJobState,
341    /// When the job becomes eligible to run.
342    pub scheduled_for: DateTime<Utc>,
343    /// When the job was created.
344    pub created_at: DateTime<Utc>,
345    /// Operator principal who created the job (free-form string at this
346    /// substrate layer; downstream callers may enforce typed actor records).
347    pub created_by: String,
348    /// When the job row was last mutated.
349    pub updated_at: DateTime<Utc>,
350}
351
352/// Error raised when a [`DecayJobRecord`] cannot be reassembled into a
353/// typed [`DecayJob`]. Surfaces on the persistence boundary when the
354/// store contains a row whose wire shape no longer matches the typed
355/// substrate (e.g. mid-deploy schema drift).
356#[derive(Debug)]
357pub enum DecayJobConversionError {
358    /// The `kind` wire token is not part of the closed alphabet.
359    UnknownKindWire(String),
360    /// The `summary_method` wire token is not part of the closed alphabet.
361    UnknownSummaryMethodWire(String),
362    /// The `state` wire token is not part of the closed alphabet.
363    UnknownStateWire(String),
364    /// `expired_principle_review` rows must carry `summary_method='none'`.
365    SummaryMethodKindMismatch {
366        /// Observed kind wire.
367        kind_wire: String,
368        /// Observed summary method wire.
369        summary_method_wire: String,
370    },
371    /// `source_ids_json` payload was shape-invalid for this kind.
372    InvalidSourceIdsJson(String),
373    /// A persisted id could not be parsed.
374    InvalidId(cortex_core::CoreError),
375    /// `failed` state rows must carry a non-empty state_reason.
376    MissingFailedReason,
377}
378
379impl fmt::Display for DecayJobConversionError {
380    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
381        match self {
382            Self::UnknownKindWire(wire) => write!(f, "unknown decay job kind wire `{wire}`"),
383            Self::UnknownSummaryMethodWire(wire) => {
384                write!(f, "unknown decay job summary method wire `{wire}`")
385            }
386            Self::UnknownStateWire(wire) => write!(f, "unknown decay job state wire `{wire}`"),
387            Self::SummaryMethodKindMismatch {
388                kind_wire,
389                summary_method_wire,
390            } => write!(
391                f,
392                "decay job kind `{kind_wire}` cannot persist summary_method `{summary_method_wire}`"
393            ),
394            Self::InvalidSourceIdsJson(msg) => {
395                write!(f, "decay job source_ids_json is malformed: {msg}")
396            }
397            Self::InvalidId(err) => write!(f, "decay job id parse error: {err}"),
398            Self::MissingFailedReason => write!(
399                f,
400                "decay job in `failed` state must carry a non-empty state_reason"
401            ),
402        }
403    }
404}
405
406impl Error for DecayJobConversionError {
407    fn source(&self) -> Option<&(dyn Error + 'static)> {
408        match self {
409            Self::InvalidId(err) => Some(err),
410            _ => None,
411        }
412    }
413}
414
415impl From<cortex_core::CoreError> for DecayJobConversionError {
416    fn from(err: cortex_core::CoreError) -> Self {
417        Self::InvalidId(err)
418    }
419}
420
421impl From<DecayJob> for DecayJobRecord {
422    fn from(job: DecayJob) -> Self {
423        let kind_wire = job.kind.kind_wire().to_string();
424        let (source_ids_json, summary_method_wire) = match &job.kind {
425            DecayJobKind::EpisodeCompression {
426                source_episode_ids,
427                summary_method,
428            } => (
429                Value::Array(
430                    source_episode_ids
431                        .iter()
432                        .map(|id| Value::String(id.to_string()))
433                        .collect(),
434                ),
435                summary_method.method_wire().to_string(),
436            ),
437            DecayJobKind::CandidateCompression {
438                source_memory_ids,
439                summary_method,
440            } => (
441                Value::Array(
442                    source_memory_ids
443                        .iter()
444                        .map(|id| Value::String(id.to_string()))
445                        .collect(),
446                ),
447                summary_method.method_wire().to_string(),
448            ),
449            DecayJobKind::ExpiredPrincipleReview { principle_id } => (
450                Value::Array(vec![Value::String(principle_id.to_string())]),
451                SUMMARY_METHOD_NONE_WIRE.to_string(),
452            ),
453        };
454        let (state_wire, state_reason, result_memory_id) = match &job.state {
455            DecayJobState::Pending | DecayJobState::InProgress | DecayJobState::Cancelled => {
456                (job.state.state_wire().to_string(), None, None)
457            }
458            DecayJobState::Completed { result_memory_id } => {
459                (job.state.state_wire().to_string(), None, *result_memory_id)
460            }
461            DecayJobState::Failed { reason } => (
462                job.state.state_wire().to_string(),
463                Some(reason.clone()),
464                None,
465            ),
466        };
467        DecayJobRecord {
468            id: job.id,
469            kind_wire,
470            summary_method_wire,
471            source_ids_json,
472            state_wire,
473            state_reason,
474            result_memory_id,
475            scheduled_for: job.scheduled_for,
476            created_at: job.created_at,
477            created_by: job.created_by,
478            updated_at: job.updated_at,
479        }
480    }
481}
482
483impl TryFrom<DecayJobRecord> for DecayJob {
484    type Error = DecayJobConversionError;
485
486    fn try_from(record: DecayJobRecord) -> Result<Self, Self::Error> {
487        let kind = parse_kind_from_record(
488            &record.kind_wire,
489            &record.summary_method_wire,
490            &record.source_ids_json,
491        )?;
492        let state = parse_state_from_record(
493            &record.state_wire,
494            record.state_reason,
495            record.result_memory_id,
496        )?;
497        Ok(DecayJob {
498            id: record.id,
499            kind,
500            state,
501            scheduled_for: record.scheduled_for,
502            created_at: record.created_at,
503            created_by: record.created_by,
504            updated_at: record.updated_at,
505        })
506    }
507}
508
509fn parse_kind_from_record(
510    kind_wire: &str,
511    summary_method_wire: &str,
512    source_ids_json: &Value,
513) -> Result<DecayJobKind, DecayJobConversionError> {
514    let array = source_ids_json.as_array().ok_or_else(|| {
515        DecayJobConversionError::InvalidSourceIdsJson("expected JSON array".into())
516    })?;
517    let strings = array
518        .iter()
519        .map(|v| {
520            v.as_str().map(str::to_string).ok_or_else(|| {
521                DecayJobConversionError::InvalidSourceIdsJson(
522                    "all source id entries must be strings".into(),
523                )
524            })
525        })
526        .collect::<Result<Vec<_>, _>>()?;
527
528    match kind_wire {
529        "episode_compression" => {
530            let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
531            let source_episode_ids = strings
532                .iter()
533                .map(|s| {
534                    s.parse::<EpisodeId>()
535                        .map_err(DecayJobConversionError::from)
536                })
537                .collect::<Result<Vec<_>, _>>()?;
538            Ok(DecayJobKind::EpisodeCompression {
539                source_episode_ids,
540                summary_method,
541            })
542        }
543        "candidate_compression" => {
544            let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
545            let source_memory_ids = strings
546                .iter()
547                .map(|s| s.parse::<MemoryId>().map_err(DecayJobConversionError::from))
548                .collect::<Result<Vec<_>, _>>()?;
549            Ok(DecayJobKind::CandidateCompression {
550                source_memory_ids,
551                summary_method,
552            })
553        }
554        "expired_principle_review" => {
555            if summary_method_wire != SUMMARY_METHOD_NONE_WIRE {
556                return Err(DecayJobConversionError::SummaryMethodKindMismatch {
557                    kind_wire: kind_wire.to_string(),
558                    summary_method_wire: summary_method_wire.to_string(),
559                });
560            }
561            let principle_id = match strings.as_slice() {
562                [single] => single
563                    .parse::<PrincipleId>()
564                    .map_err(DecayJobConversionError::from)?,
565                other => {
566                    return Err(DecayJobConversionError::InvalidSourceIdsJson(format!(
567                        "expired_principle_review expected exactly one source id, got {}",
568                        other.len()
569                    )))
570                }
571            };
572            Ok(DecayJobKind::ExpiredPrincipleReview { principle_id })
573        }
574        other => Err(DecayJobConversionError::UnknownKindWire(other.to_string())),
575    }
576}
577
578fn parse_summary_method(
579    method_wire: &str,
580    kind_wire: &str,
581) -> Result<SummaryMethod, DecayJobConversionError> {
582    match method_wire {
583        "deterministic_concatenate" => Ok(SummaryMethod::DeterministicConcatenate),
584        "llm_summary" => Ok(SummaryMethod::LlmSummary {
585            operator_attestation_required: true,
586        }),
587        SUMMARY_METHOD_NONE_WIRE => Err(DecayJobConversionError::SummaryMethodKindMismatch {
588            kind_wire: kind_wire.to_string(),
589            summary_method_wire: method_wire.to_string(),
590        }),
591        other => Err(DecayJobConversionError::UnknownSummaryMethodWire(
592            other.to_string(),
593        )),
594    }
595}
596
597fn parse_state_from_record(
598    state_wire: &str,
599    state_reason: Option<String>,
600    result_memory_id: Option<MemoryId>,
601) -> Result<DecayJobState, DecayJobConversionError> {
602    match state_wire {
603        "pending" => Ok(DecayJobState::Pending),
604        "in_progress" => Ok(DecayJobState::InProgress),
605        "completed" => Ok(DecayJobState::Completed { result_memory_id }),
606        "failed" => {
607            let reason = state_reason.ok_or(DecayJobConversionError::MissingFailedReason)?;
608            if reason.trim().is_empty() {
609                return Err(DecayJobConversionError::MissingFailedReason);
610            }
611            Ok(DecayJobState::Failed { reason })
612        }
613        "cancelled" => Ok(DecayJobState::Cancelled),
614        other => Err(DecayJobConversionError::UnknownStateWire(other.to_string())),
615    }
616}
617
618#[cfg(test)]
619mod tests {
620    use super::*;
621    use chrono::TimeZone;
622
623    fn ts() -> DateTime<Utc> {
624        Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
625    }
626
627    #[test]
628    fn decay_job_id_is_stable_round_trip() {
629        let id = DecayJobId::new();
630        let s = id.to_string();
631        assert!(s.starts_with("dcy_"), "decay job id prefix: {s}");
632        let back: DecayJobId = s.parse().expect("round-trip parse");
633        assert_eq!(back, id);
634
635        // Wrong prefix must fail.
636        let bad = format!("mem_{}", id.as_ulid());
637        assert!(bad.parse::<DecayJobId>().is_err());
638    }
639
640    #[test]
641    fn decay_job_state_transitions_are_well_typed() {
642        // The substrate does not enforce the transition graph (the worker
643        // does), but every state must report a stable wire name and a stable
644        // terminal/scheduling classification. This protects D3-B from a
645        // future variant being added without updating the worker contract.
646        let pending = DecayJobState::Pending;
647        assert_eq!(pending.state_wire(), "pending");
648        assert!(!pending.is_terminal());
649        assert!(pending.is_scheduling_eligible());
650
651        let in_progress = DecayJobState::InProgress;
652        assert_eq!(in_progress.state_wire(), "in_progress");
653        assert!(!in_progress.is_terminal());
654        assert!(!in_progress.is_scheduling_eligible());
655
656        let completed = DecayJobState::Completed {
657            result_memory_id: Some(MemoryId::new()),
658        };
659        assert_eq!(completed.state_wire(), "completed");
660        assert!(completed.is_terminal());
661        assert!(!completed.is_scheduling_eligible());
662
663        let completed_review = DecayJobState::Completed {
664            result_memory_id: None,
665        };
666        assert_eq!(completed_review.state_wire(), "completed");
667        assert!(completed_review.is_terminal());
668
669        let failed = DecayJobState::Failed {
670            reason: "store unavailable".into(),
671        };
672        assert_eq!(failed.state_wire(), "failed");
673        assert!(failed.is_terminal());
674        assert!(!failed.is_scheduling_eligible());
675
676        let cancelled = DecayJobState::Cancelled;
677        assert_eq!(cancelled.state_wire(), "cancelled");
678        assert!(cancelled.is_terminal());
679        assert!(!cancelled.is_scheduling_eligible());
680    }
681
682    #[test]
683    fn summary_method_serializes_with_method_discriminator() {
684        let det = SummaryMethod::DeterministicConcatenate;
685        let det_json = serde_json::to_value(&det).expect("serialize deterministic");
686        assert_eq!(
687            det_json,
688            serde_json::json!({"method": "deterministic_concatenate"}),
689        );
690        let det_back: SummaryMethod =
691            serde_json::from_value(det_json).expect("deserialize deterministic");
692        assert_eq!(det_back, det);
693
694        let llm = SummaryMethod::LlmSummary {
695            operator_attestation_required: true,
696        };
697        let llm_json = serde_json::to_value(&llm).expect("serialize llm");
698        assert_eq!(
699            llm_json,
700            serde_json::json!({
701                "method": "llm_summary",
702                "operator_attestation_required": true,
703            }),
704        );
705        let llm_back: SummaryMethod = serde_json::from_value(llm_json).expect("deserialize llm");
706        assert_eq!(llm_back, llm);
707
708        assert_eq!(det.method_wire(), "deterministic_concatenate");
709        assert_eq!(llm.method_wire(), "llm_summary");
710    }
711
712    #[test]
713    fn decay_job_kind_wire_matches_migration_alphabet() {
714        let episode = DecayJobKind::EpisodeCompression {
715            source_episode_ids: vec![EpisodeId::new()],
716            summary_method: SummaryMethod::DeterministicConcatenate,
717        };
718        assert_eq!(episode.kind_wire(), "episode_compression");
719        assert!(episode.summary_method().is_some());
720
721        let candidate = DecayJobKind::CandidateCompression {
722            source_memory_ids: vec![MemoryId::new()],
723            summary_method: SummaryMethod::LlmSummary {
724                operator_attestation_required: true,
725            },
726        };
727        assert_eq!(candidate.kind_wire(), "candidate_compression");
728        assert!(candidate.summary_method().is_some());
729
730        let review = DecayJobKind::ExpiredPrincipleReview {
731            principle_id: PrincipleId::new(),
732        };
733        assert_eq!(review.kind_wire(), "expired_principle_review");
734        assert!(review.summary_method().is_none());
735    }
736
737    #[test]
738    fn decay_job_round_trips_through_json() {
739        let job = DecayJob {
740            id: DecayJobId::new(),
741            kind: DecayJobKind::CandidateCompression {
742                source_memory_ids: vec![MemoryId::new(), MemoryId::new()],
743                summary_method: SummaryMethod::DeterministicConcatenate,
744            },
745            state: DecayJobState::Pending,
746            scheduled_for: ts(),
747            created_at: ts(),
748            created_by: "operator:test".into(),
749            updated_at: ts(),
750        };
751        let bytes = serde_json::to_vec(&job).expect("serialize job");
752        let back: DecayJob = serde_json::from_slice(&bytes).expect("deserialize job");
753        assert_eq!(back, job);
754    }
755
756    #[test]
757    fn decay_job_round_trips_through_persistence_record() {
758        // Episode compression with deterministic summary, pending state.
759        let job = DecayJob {
760            id: DecayJobId::new(),
761            kind: DecayJobKind::EpisodeCompression {
762                source_episode_ids: vec![EpisodeId::new(), EpisodeId::new()],
763                summary_method: SummaryMethod::DeterministicConcatenate,
764            },
765            state: DecayJobState::Pending,
766            scheduled_for: ts(),
767            created_at: ts(),
768            created_by: "operator:test".into(),
769            updated_at: ts(),
770        };
771        let record: DecayJobRecord = job.clone().into();
772        assert_eq!(record.kind_wire, "episode_compression");
773        assert_eq!(record.summary_method_wire, "deterministic_concatenate");
774        assert_eq!(record.state_wire, "pending");
775        let back: DecayJob = record.try_into().expect("record -> job");
776        assert_eq!(back, job);
777
778        // Candidate compression with LLM summary, completed with memory.
779        let memory = MemoryId::new();
780        let job = DecayJob {
781            id: DecayJobId::new(),
782            kind: DecayJobKind::CandidateCompression {
783                source_memory_ids: vec![MemoryId::new()],
784                summary_method: SummaryMethod::LlmSummary {
785                    operator_attestation_required: true,
786                },
787            },
788            state: DecayJobState::Completed {
789                result_memory_id: Some(memory),
790            },
791            scheduled_for: ts(),
792            created_at: ts(),
793            created_by: "operator:test".into(),
794            updated_at: ts(),
795        };
796        let record: DecayJobRecord = job.clone().into();
797        assert_eq!(record.summary_method_wire, "llm_summary");
798        assert_eq!(record.state_wire, "completed");
799        assert_eq!(record.result_memory_id, Some(memory));
800        let back: DecayJob = record.try_into().expect("record -> job");
801        assert_eq!(back, job);
802
803        // Expired principle review, failed state.
804        let job = DecayJob {
805            id: DecayJobId::new(),
806            kind: DecayJobKind::ExpiredPrincipleReview {
807                principle_id: PrincipleId::new(),
808            },
809            state: DecayJobState::Failed {
810                reason: "operator absent".into(),
811            },
812            scheduled_for: ts(),
813            created_at: ts(),
814            created_by: "operator:test".into(),
815            updated_at: ts(),
816        };
817        let record: DecayJobRecord = job.clone().into();
818        assert_eq!(record.kind_wire, "expired_principle_review");
819        assert_eq!(record.summary_method_wire, SUMMARY_METHOD_NONE_WIRE);
820        assert_eq!(record.state_wire, "failed");
821        assert_eq!(record.state_reason.as_deref(), Some("operator absent"));
822        let back: DecayJob = record.try_into().expect("record -> job");
823        assert_eq!(back, job);
824    }
825
826    #[test]
827    fn decay_job_record_rejects_kind_summary_method_mismatch() {
828        let record = DecayJobRecord {
829            id: DecayJobId::new(),
830            kind_wire: "expired_principle_review".into(),
831            summary_method_wire: "deterministic_concatenate".into(),
832            source_ids_json: serde_json::json!([PrincipleId::new().to_string()]),
833            state_wire: "pending".into(),
834            state_reason: None,
835            result_memory_id: None,
836            scheduled_for: ts(),
837            created_at: ts(),
838            created_by: "operator:test".into(),
839            updated_at: ts(),
840        };
841        let err = DecayJob::try_from(record).expect_err("kind/method mismatch must fail");
842        assert!(matches!(
843            err,
844            DecayJobConversionError::SummaryMethodKindMismatch { .. }
845        ));
846    }
847}