crtx-memory 0.1.1

Memory lifecycle, salience, decay policies, and contradiction objects.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
//! Phase 4.D decay job substrate.
//!
//! Decay jobs are operator-fired scheduled tasks that compress or summarize
//! OLD memory rows (and operator-driven re-promotion reviews of expired
//! principle waivers). They do NOT touch the immutable, hash-chained event
//! log: the Phase 4.D guardrail is that raw events and the hash chain remain
//! unchanged. Compression operates one level up, on the semantic memory
//! surface and on candidate principles whose policy waiver has expired.
//!
//! Provenance is preserved: every summary memory landed by a decay job
//! retains pointers back to its source memory ids and (transitively) the
//! source event ids those memories already carry, via the same
//! `source_episodes_json` / `source_events_json` fields existing memory
//! candidates use. A summary is therefore a re-shaping of *already-attested*
//! content rather than a new claim with weaker lineage.
//!
//! This module owns the **types** (state machine, kind discriminator, summary
//! method enum) and the conversion shims used by the persistence layer in
//! `cortex-store::repo::decay_jobs`. It does NOT own:
//!
//! - The actual compression / summarization implementation (Phase 4.D D3-B).
//! - The CLI surface that operators use to fire jobs (Phase 4.D D3-C).
//! - Any retrieval-side rewiring (Phase 4.D2).
//!
//! Downstream agents land on the types defined here. The wire forms produced
//! by [`DecayJobKind::kind_wire`], [`SummaryMethod::method_wire`], and
//! [`DecayJobState::state_wire`] are part of the persistence contract — they
//! match the `CHECK` constraints in `migrations/008_decay_jobs.sql`.

pub mod compress;
pub mod runner;
pub mod summary;

use std::error::Error;
use std::fmt;

use chrono::{DateTime, Utc};
use cortex_core::{DecayJobId, EpisodeId, MemoryId, PrincipleId};
use cortex_store::repo::DecayJobRecord;
use cortex_store::StoreError;
use serde::{Deserialize, Serialize};
use serde_json::Value;

/// Result alias for the decay module.
pub type DecayResult<T> = Result<T, DecayError>;

/// Errors raised by the Phase 4.D decay path.
#[derive(Debug)]
pub enum DecayError {
    /// Underlying store boundary failed.
    Store(StoreError),
    /// Caller supplied an invalid job input (empty source set, mixed id
    /// space, source not found, etc.).
    Validation(String),
    /// LLM summary refused: no operator attestation supplied.
    LlmSummaryRequiresOperatorAttestation,
    /// LLM summary attestation was malformed / failed verification.
    LlmSummaryAttestationRejected(String),
    /// LLM summary refused: the configured [`cortex_llm::SummaryBackend`]
    /// rejected the call (model not on allowlist, prompt template
    /// digest mismatch, upstream call failed, output validation failed,
    /// or the noop default backend is wired). The string carries the
    /// backend's typed reason for grep-friendly operator scripts.
    LlmSummaryBackendCallFailed(String),
}

/// Stable invariant surfaced by [`DecayError::LlmSummaryRequiresOperatorAttestation`].
pub const DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT: &str =
    "decay.llm_summary.requires_operator_attestation";

/// Stable invariant surfaced by [`DecayError::LlmSummaryAttestationRejected`].
pub const DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT: &str =
    "decay.llm_summary.attestation_rejected";

/// Stable invariant surfaced by [`DecayError::LlmSummaryBackendCallFailed`].
/// Operator scripts that grep for this token can distinguish a
/// backend-side refusal from an attestation-shape refusal.
pub const DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT: &str =
    "decay.llm_summary.backend_call_failed";

/// Stable invariant surfaced when the runner refuses to compress an empty
/// source set or a heterogeneous (mixed memory + episode) source set.
pub const DECAY_COMPRESS_INPUT_INVALID_INVARIANT: &str = "decay.compress.input_invalid";

/// Stable invariant surfaced when the runner cannot find one of the source
/// rows named by a decay job.
pub const DECAY_COMPRESS_SOURCE_MISSING_INVARIANT: &str = "decay.compress.source_missing";

impl DecayError {
    /// Stable invariant name for this error variant when one is defined.
    #[must_use]
    pub fn invariant(&self) -> Option<&'static str> {
        match self {
            Self::LlmSummaryRequiresOperatorAttestation => {
                Some(DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
            }
            Self::LlmSummaryAttestationRejected(_) => {
                Some(DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
            }
            Self::LlmSummaryBackendCallFailed(_) => {
                Some(DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT)
            }
            _ => None,
        }
    }
}

impl fmt::Display for DecayError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Store(err) => write!(f, "store error: {err}"),
            Self::Validation(message) => write!(f, "validation failed: {message}"),
            Self::LlmSummaryRequiresOperatorAttestation => write!(
                f,
                "invariant={DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT} LLM summary refused: operator attestation is required (operator-fired only)"
            ),
            Self::LlmSummaryAttestationRejected(detail) => write!(
                f,
                "invariant={DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT} LLM summary attestation rejected: {detail}"
            ),
            Self::LlmSummaryBackendCallFailed(detail) => write!(
                f,
                "invariant={DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT} LLM summary backend call failed: {detail}"
            ),
        }
    }
}

impl Error for DecayError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            Self::Store(err) => Some(err),
            _ => None,
        }
    }
}

impl From<StoreError> for DecayError {
    fn from(err: StoreError) -> Self {
        Self::Store(err)
    }
}

/// Maximum byte length of a deterministic-concatenated summary claim before
/// the compressor truncates with the standard ellipsis suffix. Picked to
/// stay comfortably below SQLite's text column / FTS5 tokeniser practical
/// limits while remaining useful as a summary surface. Stable across runs.
pub const DECAY_SUMMARY_MAX_CLAIM_BYTES: usize = 4096;

/// Separator inserted between source claims during deterministic
/// concatenation. Stable across runs.
pub const DECAY_SUMMARY_CLAIM_SEPARATOR: &str = " | ";

/// Suffix appended when the concatenated claim is truncated to
/// [`DECAY_SUMMARY_MAX_CLAIM_BYTES`].
pub const DECAY_SUMMARY_TRUNCATION_SUFFIX: &str = "... [truncated]";

/// Schema version of the operator attestation envelope accepted by the
/// LLM-summary surface. Today it mirrors the migration-attestation
/// envelope's `schema_version == 1`; the LLM-summary purpose discriminator
/// keeps the two domains structurally disjoint.
pub const DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION: u16 = 1;

/// Purpose discriminator baked into the LLM-summary operator attestation
/// envelope so a captured envelope cannot be replayed against the
/// migration-attestation surface (or vice versa).
pub const DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE: &str = "cortex.decay.llm_summary";

/// Kinds of decay operations the substrate currently supports.
///
/// The discriminator names returned by [`DecayJobKind::kind_wire`] are part
/// of the SQLite `CHECK (kind IN ('episode_compression', ...))` constraint
/// and MUST stay in sync with the migration.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum DecayJobKind {
    /// Compress N old episodes into a single summary memory. Raw events stay
    /// untouched; the resulting memory points at the source episode ids.
    EpisodeCompression {
        /// Source episodes whose interpreted content will be summarized.
        source_episode_ids: Vec<EpisodeId>,
        /// How the summary text is produced.
        summary_method: SummaryMethod,
    },
    /// Compress N old candidate memories into a single summary candidate.
    CandidateCompression {
        /// Source memories whose claim text will be summarized.
        source_memory_ids: Vec<MemoryId>,
        /// How the summary text is produced.
        summary_method: SummaryMethod,
    },
    /// Operator-fired re-promotion review of an expired principle waiver.
    /// Reuses the principle promotion ceremony; the decay job is just the
    /// scheduling vehicle so the review surfaces on a calendar instead of
    /// requiring an out-of-band ticket.
    ExpiredPrincipleReview {
        /// Principle whose waiver has expired and which now needs review.
        principle_id: PrincipleId,
    },
}

impl DecayJobKind {
    /// SQLite wire discriminator for the `kind` column.
    #[must_use]
    pub const fn kind_wire(&self) -> &'static str {
        match self {
            Self::EpisodeCompression { .. } => "episode_compression",
            Self::CandidateCompression { .. } => "candidate_compression",
            Self::ExpiredPrincipleReview { .. } => "expired_principle_review",
        }
    }

    /// The summary method this kind carries, if it carries one. Operator
    /// review jobs do not compress text and therefore do not carry one.
    #[must_use]
    pub fn summary_method(&self) -> Option<&SummaryMethod> {
        match self {
            Self::EpisodeCompression { summary_method, .. }
            | Self::CandidateCompression { summary_method, .. } => Some(summary_method),
            Self::ExpiredPrincipleReview { .. } => None,
        }
    }
}

/// How a compression decay job produces its summary text.
///
/// Serialized with an explicit `method` discriminator so deterministic and
/// LLM-backed variants are distinguishable on the wire. The default
/// representation chosen for tests is enforced by
/// [`tests::summary_method_serializes_with_method_discriminator`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "method", rename_all = "snake_case")]
pub enum SummaryMethod {
    /// Deterministic concatenation + truncation. No LLM call. The original
    /// content is not lost: provenance ids still point at the originals, so
    /// any consumer can re-derive richer summaries later.
    DeterministicConcatenate,
    /// Operator-fired LLM summarization. Requires an explicit human review
    /// step before the summary memory lands. The boolean is part of the
    /// type, not a runtime config, so the gate cannot be silently dropped.
    LlmSummary {
        /// Whether an operator attestation must be supplied before the
        /// resulting summary memory becomes durable. The substrate exists
        /// only at `true` today; the field is kept as a `bool` so a future
        /// non-operator-attested path (e.g. fully sandboxed quarantine
        /// summarization) is representable without a wire break.
        operator_attestation_required: bool,
    },
}

impl SummaryMethod {
    /// SQLite wire discriminator for the `summary_method` column.
    ///
    /// `ExpiredPrincipleReview` jobs have no summary text and persist
    /// `"none"` in the column — that token is reserved here so the wire
    /// alphabet is closed.
    #[must_use]
    pub const fn method_wire(&self) -> &'static str {
        match self {
            Self::DeterministicConcatenate => "deterministic_concatenate",
            Self::LlmSummary { .. } => "llm_summary",
        }
    }
}

/// Sentinel wire token used in the `summary_method` column when the job kind
/// does not carry a summary method (currently only `ExpiredPrincipleReview`).
///
/// Re-exported from `cortex_store::repo::SUMMARY_METHOD_NONE_WIRE` so the
/// types module and the persistence module agree on a single source of
/// truth.
pub use cortex_store::repo::SUMMARY_METHOD_NONE_WIRE;

/// Lifecycle state of a decay job. The terminal states (`Completed`,
/// `Failed`, `Cancelled`) are observable but no longer subject to scheduling.
///
/// The discriminator returned by [`DecayJobState::state_wire`] matches the
/// SQLite `CHECK (state IN ('pending', ...))` constraint.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum DecayJobState {
    /// Scheduled but not yet picked up by the worker.
    Pending,
    /// Worker has claimed the job and is processing it.
    InProgress,
    /// Worker finished successfully. `result_memory_id` is set when the
    /// kind produced a durable summary memory; it is `None` for
    /// `ExpiredPrincipleReview` jobs that only opened a ceremony.
    Completed {
        /// Identifier of the summary memory the job produced, if any.
        result_memory_id: Option<MemoryId>,
    },
    /// Worker observed a refusal or store error. `reason` is the operator-
    /// visible explanation; it MUST NOT be empty.
    Failed {
        /// Operator-visible failure explanation.
        reason: String,
    },
    /// Operator cancelled the job before it ran (or before it completed).
    Cancelled,
}

impl DecayJobState {
    /// SQLite wire discriminator for the `state` column.
    #[must_use]
    pub const fn state_wire(&self) -> &'static str {
        match self {
            Self::Pending => "pending",
            Self::InProgress => "in_progress",
            Self::Completed { .. } => "completed",
            Self::Failed { .. } => "failed",
            Self::Cancelled => "cancelled",
        }
    }

    /// True if the state is terminal (no further transitions are expected).
    #[must_use]
    pub const fn is_terminal(&self) -> bool {
        matches!(
            self,
            Self::Completed { .. } | Self::Failed { .. } | Self::Cancelled
        )
    }

    /// True if the state is eligible to be picked up by a worker.
    #[must_use]
    pub const fn is_scheduling_eligible(&self) -> bool {
        matches!(self, Self::Pending)
    }
}

/// Durable shape of a scheduled decay job.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DecayJob {
    /// Stable decay job identifier.
    pub id: DecayJobId,
    /// What this job does.
    pub kind: DecayJobKind,
    /// Current lifecycle state.
    pub state: DecayJobState,
    /// When the job becomes eligible to run.
    pub scheduled_for: DateTime<Utc>,
    /// When the job was created.
    pub created_at: DateTime<Utc>,
    /// Operator principal who created the job (free-form string at this
    /// substrate layer; downstream callers may enforce typed actor records).
    pub created_by: String,
    /// When the job row was last mutated.
    pub updated_at: DateTime<Utc>,
}

/// Error raised when a [`DecayJobRecord`] cannot be reassembled into a
/// typed [`DecayJob`]. Surfaces on the persistence boundary when the
/// store contains a row whose wire shape no longer matches the typed
/// substrate (e.g. mid-deploy schema drift).
#[derive(Debug)]
pub enum DecayJobConversionError {
    /// The `kind` wire token is not part of the closed alphabet.
    UnknownKindWire(String),
    /// The `summary_method` wire token is not part of the closed alphabet.
    UnknownSummaryMethodWire(String),
    /// The `state` wire token is not part of the closed alphabet.
    UnknownStateWire(String),
    /// `expired_principle_review` rows must carry `summary_method='none'`.
    SummaryMethodKindMismatch {
        /// Observed kind wire.
        kind_wire: String,
        /// Observed summary method wire.
        summary_method_wire: String,
    },
    /// `source_ids_json` payload was shape-invalid for this kind.
    InvalidSourceIdsJson(String),
    /// A persisted id could not be parsed.
    InvalidId(cortex_core::CoreError),
    /// `failed` state rows must carry a non-empty state_reason.
    MissingFailedReason,
}

impl fmt::Display for DecayJobConversionError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::UnknownKindWire(wire) => write!(f, "unknown decay job kind wire `{wire}`"),
            Self::UnknownSummaryMethodWire(wire) => {
                write!(f, "unknown decay job summary method wire `{wire}`")
            }
            Self::UnknownStateWire(wire) => write!(f, "unknown decay job state wire `{wire}`"),
            Self::SummaryMethodKindMismatch {
                kind_wire,
                summary_method_wire,
            } => write!(
                f,
                "decay job kind `{kind_wire}` cannot persist summary_method `{summary_method_wire}`"
            ),
            Self::InvalidSourceIdsJson(msg) => {
                write!(f, "decay job source_ids_json is malformed: {msg}")
            }
            Self::InvalidId(err) => write!(f, "decay job id parse error: {err}"),
            Self::MissingFailedReason => write!(
                f,
                "decay job in `failed` state must carry a non-empty state_reason"
            ),
        }
    }
}

impl Error for DecayJobConversionError {
    fn source(&self) -> Option<&(dyn Error + 'static)> {
        match self {
            Self::InvalidId(err) => Some(err),
            _ => None,
        }
    }
}

impl From<cortex_core::CoreError> for DecayJobConversionError {
    fn from(err: cortex_core::CoreError) -> Self {
        Self::InvalidId(err)
    }
}

impl From<DecayJob> for DecayJobRecord {
    fn from(job: DecayJob) -> Self {
        let kind_wire = job.kind.kind_wire().to_string();
        let (source_ids_json, summary_method_wire) = match &job.kind {
            DecayJobKind::EpisodeCompression {
                source_episode_ids,
                summary_method,
            } => (
                Value::Array(
                    source_episode_ids
                        .iter()
                        .map(|id| Value::String(id.to_string()))
                        .collect(),
                ),
                summary_method.method_wire().to_string(),
            ),
            DecayJobKind::CandidateCompression {
                source_memory_ids,
                summary_method,
            } => (
                Value::Array(
                    source_memory_ids
                        .iter()
                        .map(|id| Value::String(id.to_string()))
                        .collect(),
                ),
                summary_method.method_wire().to_string(),
            ),
            DecayJobKind::ExpiredPrincipleReview { principle_id } => (
                Value::Array(vec![Value::String(principle_id.to_string())]),
                SUMMARY_METHOD_NONE_WIRE.to_string(),
            ),
        };
        let (state_wire, state_reason, result_memory_id) = match &job.state {
            DecayJobState::Pending | DecayJobState::InProgress | DecayJobState::Cancelled => {
                (job.state.state_wire().to_string(), None, None)
            }
            DecayJobState::Completed { result_memory_id } => {
                (job.state.state_wire().to_string(), None, *result_memory_id)
            }
            DecayJobState::Failed { reason } => (
                job.state.state_wire().to_string(),
                Some(reason.clone()),
                None,
            ),
        };
        DecayJobRecord {
            id: job.id,
            kind_wire,
            summary_method_wire,
            source_ids_json,
            state_wire,
            state_reason,
            result_memory_id,
            scheduled_for: job.scheduled_for,
            created_at: job.created_at,
            created_by: job.created_by,
            updated_at: job.updated_at,
        }
    }
}

impl TryFrom<DecayJobRecord> for DecayJob {
    type Error = DecayJobConversionError;

    fn try_from(record: DecayJobRecord) -> Result<Self, Self::Error> {
        let kind = parse_kind_from_record(
            &record.kind_wire,
            &record.summary_method_wire,
            &record.source_ids_json,
        )?;
        let state = parse_state_from_record(
            &record.state_wire,
            record.state_reason,
            record.result_memory_id,
        )?;
        Ok(DecayJob {
            id: record.id,
            kind,
            state,
            scheduled_for: record.scheduled_for,
            created_at: record.created_at,
            created_by: record.created_by,
            updated_at: record.updated_at,
        })
    }
}

fn parse_kind_from_record(
    kind_wire: &str,
    summary_method_wire: &str,
    source_ids_json: &Value,
) -> Result<DecayJobKind, DecayJobConversionError> {
    let array = source_ids_json.as_array().ok_or_else(|| {
        DecayJobConversionError::InvalidSourceIdsJson("expected JSON array".into())
    })?;
    let strings = array
        .iter()
        .map(|v| {
            v.as_str().map(str::to_string).ok_or_else(|| {
                DecayJobConversionError::InvalidSourceIdsJson(
                    "all source id entries must be strings".into(),
                )
            })
        })
        .collect::<Result<Vec<_>, _>>()?;

    match kind_wire {
        "episode_compression" => {
            let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
            let source_episode_ids = strings
                .iter()
                .map(|s| {
                    s.parse::<EpisodeId>()
                        .map_err(DecayJobConversionError::from)
                })
                .collect::<Result<Vec<_>, _>>()?;
            Ok(DecayJobKind::EpisodeCompression {
                source_episode_ids,
                summary_method,
            })
        }
        "candidate_compression" => {
            let summary_method = parse_summary_method(summary_method_wire, kind_wire)?;
            let source_memory_ids = strings
                .iter()
                .map(|s| s.parse::<MemoryId>().map_err(DecayJobConversionError::from))
                .collect::<Result<Vec<_>, _>>()?;
            Ok(DecayJobKind::CandidateCompression {
                source_memory_ids,
                summary_method,
            })
        }
        "expired_principle_review" => {
            if summary_method_wire != SUMMARY_METHOD_NONE_WIRE {
                return Err(DecayJobConversionError::SummaryMethodKindMismatch {
                    kind_wire: kind_wire.to_string(),
                    summary_method_wire: summary_method_wire.to_string(),
                });
            }
            let principle_id = match strings.as_slice() {
                [single] => single
                    .parse::<PrincipleId>()
                    .map_err(DecayJobConversionError::from)?,
                other => {
                    return Err(DecayJobConversionError::InvalidSourceIdsJson(format!(
                        "expired_principle_review expected exactly one source id, got {}",
                        other.len()
                    )))
                }
            };
            Ok(DecayJobKind::ExpiredPrincipleReview { principle_id })
        }
        other => Err(DecayJobConversionError::UnknownKindWire(other.to_string())),
    }
}

fn parse_summary_method(
    method_wire: &str,
    kind_wire: &str,
) -> Result<SummaryMethod, DecayJobConversionError> {
    match method_wire {
        "deterministic_concatenate" => Ok(SummaryMethod::DeterministicConcatenate),
        "llm_summary" => Ok(SummaryMethod::LlmSummary {
            operator_attestation_required: true,
        }),
        SUMMARY_METHOD_NONE_WIRE => Err(DecayJobConversionError::SummaryMethodKindMismatch {
            kind_wire: kind_wire.to_string(),
            summary_method_wire: method_wire.to_string(),
        }),
        other => Err(DecayJobConversionError::UnknownSummaryMethodWire(
            other.to_string(),
        )),
    }
}

fn parse_state_from_record(
    state_wire: &str,
    state_reason: Option<String>,
    result_memory_id: Option<MemoryId>,
) -> Result<DecayJobState, DecayJobConversionError> {
    match state_wire {
        "pending" => Ok(DecayJobState::Pending),
        "in_progress" => Ok(DecayJobState::InProgress),
        "completed" => Ok(DecayJobState::Completed { result_memory_id }),
        "failed" => {
            let reason = state_reason.ok_or(DecayJobConversionError::MissingFailedReason)?;
            if reason.trim().is_empty() {
                return Err(DecayJobConversionError::MissingFailedReason);
            }
            Ok(DecayJobState::Failed { reason })
        }
        "cancelled" => Ok(DecayJobState::Cancelled),
        other => Err(DecayJobConversionError::UnknownStateWire(other.to_string())),
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use chrono::TimeZone;

    fn ts() -> DateTime<Utc> {
        Utc.with_ymd_and_hms(2026, 5, 13, 12, 0, 0).unwrap()
    }

    #[test]
    fn decay_job_id_is_stable_round_trip() {
        let id = DecayJobId::new();
        let s = id.to_string();
        assert!(s.starts_with("dcy_"), "decay job id prefix: {s}");
        let back: DecayJobId = s.parse().expect("round-trip parse");
        assert_eq!(back, id);

        // Wrong prefix must fail.
        let bad = format!("mem_{}", id.as_ulid());
        assert!(bad.parse::<DecayJobId>().is_err());
    }

    #[test]
    fn decay_job_state_transitions_are_well_typed() {
        // The substrate does not enforce the transition graph (the worker
        // does), but every state must report a stable wire name and a stable
        // terminal/scheduling classification. This protects D3-B from a
        // future variant being added without updating the worker contract.
        let pending = DecayJobState::Pending;
        assert_eq!(pending.state_wire(), "pending");
        assert!(!pending.is_terminal());
        assert!(pending.is_scheduling_eligible());

        let in_progress = DecayJobState::InProgress;
        assert_eq!(in_progress.state_wire(), "in_progress");
        assert!(!in_progress.is_terminal());
        assert!(!in_progress.is_scheduling_eligible());

        let completed = DecayJobState::Completed {
            result_memory_id: Some(MemoryId::new()),
        };
        assert_eq!(completed.state_wire(), "completed");
        assert!(completed.is_terminal());
        assert!(!completed.is_scheduling_eligible());

        let completed_review = DecayJobState::Completed {
            result_memory_id: None,
        };
        assert_eq!(completed_review.state_wire(), "completed");
        assert!(completed_review.is_terminal());

        let failed = DecayJobState::Failed {
            reason: "store unavailable".into(),
        };
        assert_eq!(failed.state_wire(), "failed");
        assert!(failed.is_terminal());
        assert!(!failed.is_scheduling_eligible());

        let cancelled = DecayJobState::Cancelled;
        assert_eq!(cancelled.state_wire(), "cancelled");
        assert!(cancelled.is_terminal());
        assert!(!cancelled.is_scheduling_eligible());
    }

    #[test]
    fn summary_method_serializes_with_method_discriminator() {
        let det = SummaryMethod::DeterministicConcatenate;
        let det_json = serde_json::to_value(&det).expect("serialize deterministic");
        assert_eq!(
            det_json,
            serde_json::json!({"method": "deterministic_concatenate"}),
        );
        let det_back: SummaryMethod =
            serde_json::from_value(det_json).expect("deserialize deterministic");
        assert_eq!(det_back, det);

        let llm = SummaryMethod::LlmSummary {
            operator_attestation_required: true,
        };
        let llm_json = serde_json::to_value(&llm).expect("serialize llm");
        assert_eq!(
            llm_json,
            serde_json::json!({
                "method": "llm_summary",
                "operator_attestation_required": true,
            }),
        );
        let llm_back: SummaryMethod = serde_json::from_value(llm_json).expect("deserialize llm");
        assert_eq!(llm_back, llm);

        assert_eq!(det.method_wire(), "deterministic_concatenate");
        assert_eq!(llm.method_wire(), "llm_summary");
    }

    #[test]
    fn decay_job_kind_wire_matches_migration_alphabet() {
        let episode = DecayJobKind::EpisodeCompression {
            source_episode_ids: vec![EpisodeId::new()],
            summary_method: SummaryMethod::DeterministicConcatenate,
        };
        assert_eq!(episode.kind_wire(), "episode_compression");
        assert!(episode.summary_method().is_some());

        let candidate = DecayJobKind::CandidateCompression {
            source_memory_ids: vec![MemoryId::new()],
            summary_method: SummaryMethod::LlmSummary {
                operator_attestation_required: true,
            },
        };
        assert_eq!(candidate.kind_wire(), "candidate_compression");
        assert!(candidate.summary_method().is_some());

        let review = DecayJobKind::ExpiredPrincipleReview {
            principle_id: PrincipleId::new(),
        };
        assert_eq!(review.kind_wire(), "expired_principle_review");
        assert!(review.summary_method().is_none());
    }

    #[test]
    fn decay_job_round_trips_through_json() {
        let job = DecayJob {
            id: DecayJobId::new(),
            kind: DecayJobKind::CandidateCompression {
                source_memory_ids: vec![MemoryId::new(), MemoryId::new()],
                summary_method: SummaryMethod::DeterministicConcatenate,
            },
            state: DecayJobState::Pending,
            scheduled_for: ts(),
            created_at: ts(),
            created_by: "operator:test".into(),
            updated_at: ts(),
        };
        let bytes = serde_json::to_vec(&job).expect("serialize job");
        let back: DecayJob = serde_json::from_slice(&bytes).expect("deserialize job");
        assert_eq!(back, job);
    }

    #[test]
    fn decay_job_round_trips_through_persistence_record() {
        // Episode compression with deterministic summary, pending state.
        let job = DecayJob {
            id: DecayJobId::new(),
            kind: DecayJobKind::EpisodeCompression {
                source_episode_ids: vec![EpisodeId::new(), EpisodeId::new()],
                summary_method: SummaryMethod::DeterministicConcatenate,
            },
            state: DecayJobState::Pending,
            scheduled_for: ts(),
            created_at: ts(),
            created_by: "operator:test".into(),
            updated_at: ts(),
        };
        let record: DecayJobRecord = job.clone().into();
        assert_eq!(record.kind_wire, "episode_compression");
        assert_eq!(record.summary_method_wire, "deterministic_concatenate");
        assert_eq!(record.state_wire, "pending");
        let back: DecayJob = record.try_into().expect("record -> job");
        assert_eq!(back, job);

        // Candidate compression with LLM summary, completed with memory.
        let memory = MemoryId::new();
        let job = DecayJob {
            id: DecayJobId::new(),
            kind: DecayJobKind::CandidateCompression {
                source_memory_ids: vec![MemoryId::new()],
                summary_method: SummaryMethod::LlmSummary {
                    operator_attestation_required: true,
                },
            },
            state: DecayJobState::Completed {
                result_memory_id: Some(memory),
            },
            scheduled_for: ts(),
            created_at: ts(),
            created_by: "operator:test".into(),
            updated_at: ts(),
        };
        let record: DecayJobRecord = job.clone().into();
        assert_eq!(record.summary_method_wire, "llm_summary");
        assert_eq!(record.state_wire, "completed");
        assert_eq!(record.result_memory_id, Some(memory));
        let back: DecayJob = record.try_into().expect("record -> job");
        assert_eq!(back, job);

        // Expired principle review, failed state.
        let job = DecayJob {
            id: DecayJobId::new(),
            kind: DecayJobKind::ExpiredPrincipleReview {
                principle_id: PrincipleId::new(),
            },
            state: DecayJobState::Failed {
                reason: "operator absent".into(),
            },
            scheduled_for: ts(),
            created_at: ts(),
            created_by: "operator:test".into(),
            updated_at: ts(),
        };
        let record: DecayJobRecord = job.clone().into();
        assert_eq!(record.kind_wire, "expired_principle_review");
        assert_eq!(record.summary_method_wire, SUMMARY_METHOD_NONE_WIRE);
        assert_eq!(record.state_wire, "failed");
        assert_eq!(record.state_reason.as_deref(), Some("operator absent"));
        let back: DecayJob = record.try_into().expect("record -> job");
        assert_eq!(back, job);
    }

    #[test]
    fn decay_job_record_rejects_kind_summary_method_mismatch() {
        let record = DecayJobRecord {
            id: DecayJobId::new(),
            kind_wire: "expired_principle_review".into(),
            summary_method_wire: "deterministic_concatenate".into(),
            source_ids_json: serde_json::json!([PrincipleId::new().to_string()]),
            state_wire: "pending".into(),
            state_reason: None,
            result_memory_id: None,
            scheduled_for: ts(),
            created_at: ts(),
            created_by: "operator:test".into(),
            updated_at: ts(),
        };
        let err = DecayJob::try_from(record).expect_err("kind/method mismatch must fail");
        assert!(matches!(
            err,
            DecayJobConversionError::SummaryMethodKindMismatch { .. }
        ));
    }
}