Skip to main content

cortex_memory/decay/
compress.rs

1//! Deterministic-concatenate compression for Phase 4.D decay jobs.
2//!
3//! Given N candidate-tier memories (or N episode summaries), produce ONE
4//! summary memory whose:
5//!
6//! - `claim` is a deterministic concatenation + truncation of the sources'
7//!   claims (separator [`super::DECAY_SUMMARY_CLAIM_SEPARATOR`], truncation
8//!   at [`super::DECAY_SUMMARY_MAX_CLAIM_BYTES`] with suffix
9//!   [`super::DECAY_SUMMARY_TRUNCATION_SUFFIX`]).
10//! - `confidence` = `min(source.confidence)` — pessimistic.
11//! - `authority` = lowest-trust authority across sources. The trust order is
12//!   the [`AuthorityTier::trust_rank`] ladder; compression CANNOT launder
13//!   authority upward.
14//! - `source_episodes_json` / `source_events_json` = deduplicated union of
15//!   the sources' provenance arrays.
16//! - `domains_json` = deduplicated union of the sources' domain tags.
17//! - `created_at` / `updated_at` = `Utc::now()`.
18//! - `memory_type` = `"summary"` (so downstream readers can distinguish a
19//!   compressed-from-many memory from a primary candidate).
20//! - `status` = `"candidate"` (the compressed summary is candidate-tier
21//!   evidence; the standard acceptance ceremony still applies).
22//!
23//! The source memories / episodes are NOT deleted; the
24//! `memory_supersessions` / `episode_supersessions` join tables landed by
25//! migration `009_decay_supersessions` record the "this summary was
26//! produced by compressing X, Y, Z" edge.
27//!
28//! ## Determinism contract
29//!
30//! Same inputs (source rows, source-id order) MUST produce a summary memory
31//! whose claim, confidence, authority, and provenance arrays are
32//! byte-stable across runs. The only non-deterministic field is the
33//! summary memory's id (ULID) and its `created_at` / `updated_at`
34//! timestamps. The runner ([`super::runner`]) and the compressor are
35//! intentionally separate so this determinism stays inspectable from the
36//! call site.
37
38use std::collections::BTreeSet;
39
40use chrono::Utc;
41use cortex_core::{DecayJobId, EpisodeId, MemoryId};
42use cortex_store::repo::{DecayJobRepo, EpisodeRepo, MemoryCandidate, MemoryRecord, MemoryRepo};
43use cortex_store::Pool;
44use serde_json::Value;
45
46use super::{
47    DecayError, DecayResult, DECAY_COMPRESS_INPUT_INVALID_INVARIANT,
48    DECAY_COMPRESS_SOURCE_MISSING_INVARIANT, DECAY_SUMMARY_CLAIM_SEPARATOR,
49    DECAY_SUMMARY_MAX_CLAIM_BYTES, DECAY_SUMMARY_TRUNCATION_SUFFIX,
50};
51
52/// Trust-tier ranking used to fold an authority label across N sources.
53///
54/// The compressor cannot launder authority upward: the produced summary's
55/// authority tier equals the LOWEST tier among the sources. The string
56/// representation written to the durable row is the lowest source's
57/// authority string verbatim — that is, if all sources are `"candidate"`
58/// then the produced summary is `"candidate"`, even if the trust rank
59/// would notionally permit a higher label.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
61pub enum AuthorityTier {
62    /// Derived, tool, runtime, external, missing, or failed-verify source.
63    /// Lowest trust.
64    Derived,
65    /// Candidate-tier memory (not yet accepted).
66    Candidate,
67    /// Verified agent-authored source.
68    Agent,
69    /// Verified user / manual-correction source. Highest trust.
70    User,
71}
72
73impl AuthorityTier {
74    /// Integer trust rank: lower is less trusted.
75    #[must_use]
76    pub const fn trust_rank(self) -> u8 {
77        match self {
78            Self::Derived => 0,
79            Self::Candidate => 1,
80            Self::Agent => 2,
81            Self::User => 3,
82        }
83    }
84
85    /// Best-effort parse of a stored authority string into a tier rank.
86    /// Unknown values are conservatively classified as the lowest tier
87    /// ([`AuthorityTier::Derived`]) so an unfamiliar label cannot
88    /// accidentally launder a compression upward.
89    #[must_use]
90    pub fn parse_lenient(authority: &str) -> Self {
91        match authority {
92            "user" | "User" => Self::User,
93            "agent" | "Agent" => Self::Agent,
94            "candidate" | "Candidate" => Self::Candidate,
95            // ADR 0019 derived snake_case form and any other unknown label.
96            _ => Self::Derived,
97        }
98    }
99}
100
101/// Compress N candidate-tier memories into one summary memory and persist
102/// the produced summary as a candidate row.
103///
104/// Idempotency note: this function does not check whether the same source
105/// set has already been compressed — the runner ([`super::runner`]) owns
106/// the "do not re-run completed jobs" guard. Callers outside the runner
107/// must enforce idempotency themselves.
108pub fn compress_candidate_memories(
109    pool: &Pool,
110    source_memory_ids: &[MemoryId],
111    operator: &str,
112) -> DecayResult<MemoryId> {
113    compress_candidate_memories_with_job(pool, source_memory_ids, operator, None)
114}
115
116/// Same as [`compress_candidate_memories`] but attaches the produced
117/// supersession edges to a specific decay job (so the runner can write the
118/// audit trail under the job's id).
119pub fn compress_candidate_memories_with_job(
120    pool: &Pool,
121    source_memory_ids: &[MemoryId],
122    operator: &str,
123    job_id: Option<&DecayJobId>,
124) -> DecayResult<MemoryId> {
125    if source_memory_ids.is_empty() {
126        return Err(DecayError::Validation(format!(
127            "{DECAY_COMPRESS_INPUT_INVALID_INVARIANT}: source_memory_ids must be non-empty"
128        )));
129    }
130    if operator.trim().is_empty() {
131        return Err(DecayError::Validation(
132            "operator label must be non-empty".into(),
133        ));
134    }
135
136    let memory_repo = MemoryRepo::new(pool);
137    let mut sources = Vec::with_capacity(source_memory_ids.len());
138    for id in source_memory_ids {
139        match memory_repo.get_by_id(id)? {
140            Some(record) => sources.push(record),
141            None => {
142                return Err(DecayError::Validation(format!(
143                    "{DECAY_COMPRESS_SOURCE_MISSING_INVARIANT}: memory {id} not found"
144                )));
145            }
146        }
147    }
148
149    let summary = build_memory_summary(&sources, source_memory_ids);
150    let summary_id = summary.id;
151    memory_repo.insert_candidate(&summary)?;
152
153    let job_repo = DecayJobRepo::new(pool);
154    let now = Utc::now();
155    for source in &sources {
156        job_repo.record_memory_supersession(&source.id, &summary_id, job_id, now)?;
157    }
158
159    Ok(summary_id)
160}
161
162/// Compress N episode summaries into one summary memory and persist the
163/// produced summary as a candidate row.
164pub fn compress_episodes(
165    pool: &Pool,
166    source_episode_ids: &[EpisodeId],
167    operator: &str,
168) -> DecayResult<MemoryId> {
169    compress_episodes_with_job(pool, source_episode_ids, operator, None)
170}
171
172/// Same as [`compress_episodes`] but attaches the produced supersession
173/// edges to a specific decay job.
174pub fn compress_episodes_with_job(
175    pool: &Pool,
176    source_episode_ids: &[EpisodeId],
177    operator: &str,
178    job_id: Option<&DecayJobId>,
179) -> DecayResult<MemoryId> {
180    if source_episode_ids.is_empty() {
181        return Err(DecayError::Validation(format!(
182            "{DECAY_COMPRESS_INPUT_INVALID_INVARIANT}: source_episode_ids must be non-empty"
183        )));
184    }
185    if operator.trim().is_empty() {
186        return Err(DecayError::Validation(
187            "operator label must be non-empty".into(),
188        ));
189    }
190
191    let episode_repo = EpisodeRepo::new(pool);
192    let mut sources = Vec::with_capacity(source_episode_ids.len());
193    for id in source_episode_ids {
194        match episode_repo.get_by_id(id)? {
195            Some(record) => sources.push(record),
196            None => {
197                return Err(DecayError::Validation(format!(
198                    "{DECAY_COMPRESS_SOURCE_MISSING_INVARIANT}: episode {id} not found"
199                )));
200            }
201        }
202    }
203
204    let summary = build_episode_summary(&sources, source_episode_ids);
205    let summary_id = summary.id;
206    let memory_repo = MemoryRepo::new(pool);
207    memory_repo.insert_candidate(&summary)?;
208
209    let job_repo = DecayJobRepo::new(pool);
210    let now = Utc::now();
211    for source in &sources {
212        job_repo.record_episode_supersession(&source.id, &summary_id, job_id, now)?;
213    }
214
215    Ok(summary_id)
216}
217
218fn build_memory_summary(sources: &[MemoryRecord], source_ids: &[MemoryId]) -> MemoryCandidate {
219    let claim = concatenate_claims(sources.iter().map(|m| m.claim.as_str()));
220    let confidence = pessimistic_min_confidence(sources.iter().map(|m| m.confidence));
221    let authority = lowest_authority_label(sources.iter().map(|m| m.authority.as_str()));
222
223    let domains = union_json_strings(sources.iter().map(|m| &m.domains_json));
224    let source_events = union_json_strings(sources.iter().map(|m| &m.source_events_json));
225
226    // The summary's `source_episodes_json` is the union of the sources'
227    // episode arrays. We keep it as the union-of-source-arrays so retrieval
228    // can follow the existing lineage shape without learning about a new
229    // column. The reference to the source memory ids themselves is encoded
230    // into `applies_when_json` under the `summary_of_memories` key (so the
231    // existing schema does not grow a new column).
232    let source_episodes = union_json_strings(sources.iter().map(|m| &m.source_episodes_json));
233
234    // `MemoryRepo::insert_candidate` requires at least one of
235    // `source_episodes_json` or `source_events_json` to be non-empty. If
236    // BOTH are empty after union (degenerate input where every source has
237    // empty provenance), we fall back to encoding the source memory ids
238    // into `source_episodes_json` as a last-resort lineage anchor. The
239    // primary lineage channel is `memory_supersessions` regardless, so
240    // retrieval will still resolve the edge.
241    let source_episodes =
242        if json_array_is_empty(&source_episodes) && json_array_is_empty(&source_events) {
243            Value::Array(
244                source_ids
245                    .iter()
246                    .map(|id| Value::String(id.to_string()))
247                    .collect(),
248            )
249        } else {
250            source_episodes
251        };
252
253    let now = Utc::now();
254    let applies_when = source_memory_provenance_envelope(source_ids);
255
256    MemoryCandidate {
257        id: MemoryId::new(),
258        memory_type: "summary".into(),
259        claim,
260        source_episodes_json: source_episodes,
261        source_events_json: source_events,
262        domains_json: domains,
263        salience_json: Value::Object(serde_json::Map::new()),
264        confidence,
265        authority,
266        applies_when_json: applies_when,
267        does_not_apply_when_json: Value::Array(Vec::new()),
268        created_at: now,
269        updated_at: now,
270    }
271}
272
273fn build_episode_summary(
274    sources: &[cortex_store::repo::EpisodeRecord],
275    source_ids: &[EpisodeId],
276) -> MemoryCandidate {
277    let claim = concatenate_claims(sources.iter().map(|e| e.summary.as_str()));
278    let confidence = pessimistic_min_confidence(sources.iter().map(|e| e.confidence));
279    // Episodes do not carry an explicit authority tier; fold to Derived
280    // conservatively (lowest trust). The acceptance ceremony will raise
281    // the trust label upstream if applicable.
282    let authority = AuthorityTier::Derived;
283
284    let domains = union_json_strings(sources.iter().map(|e| &e.domains_json));
285    // The episode rows are the source provenance: the produced summary's
286    // `source_episodes_json` is the list of source episode ids.
287    let source_episodes = source_episode_id_array(source_ids);
288    // Union of the episode rows' own `source_events_json` keeps the event
289    // lineage chain intact end-to-end.
290    let source_events = union_json_strings(sources.iter().map(|e| &e.source_events_json));
291
292    let now = Utc::now();
293    let applies_when = source_episode_provenance_envelope(source_ids);
294
295    MemoryCandidate {
296        id: MemoryId::new(),
297        memory_type: "summary".into(),
298        claim,
299        source_episodes_json: source_episodes,
300        source_events_json: source_events,
301        domains_json: domains,
302        salience_json: Value::Object(serde_json::Map::new()),
303        confidence,
304        authority: authority_label_for(authority),
305        applies_when_json: applies_when,
306        does_not_apply_when_json: Value::Array(Vec::new()),
307        created_at: now,
308        updated_at: now,
309    }
310}
311
312fn concatenate_claims<'a, I: IntoIterator<Item = &'a str>>(claims: I) -> String {
313    let joined: String = claims
314        .into_iter()
315        .collect::<Vec<_>>()
316        .join(DECAY_SUMMARY_CLAIM_SEPARATOR);
317
318    if joined.len() <= DECAY_SUMMARY_MAX_CLAIM_BYTES {
319        return joined;
320    }
321
322    // Truncate at the last UTF-8 boundary that leaves room for the suffix.
323    let suffix = DECAY_SUMMARY_TRUNCATION_SUFFIX;
324    let budget = DECAY_SUMMARY_MAX_CLAIM_BYTES.saturating_sub(suffix.len());
325    let mut end = budget;
326    while end > 0 && !joined.is_char_boundary(end) {
327        end -= 1;
328    }
329    let mut out = String::with_capacity(end + suffix.len());
330    out.push_str(&joined[..end]);
331    out.push_str(suffix);
332    out
333}
334
335fn pessimistic_min_confidence<I: IntoIterator<Item = f64>>(values: I) -> f64 {
336    values
337        .into_iter()
338        .fold(f64::INFINITY, |acc, v| acc.min(v))
339        .clamp(0.0, 1.0)
340}
341
342fn lowest_authority_label<'a, I: IntoIterator<Item = &'a str>>(labels: I) -> String {
343    let mut min_tier = AuthorityTier::User;
344    let mut min_label: Option<String> = None;
345    for label in labels {
346        let tier = AuthorityTier::parse_lenient(label);
347        if tier <= min_tier {
348            min_tier = tier;
349            min_label = Some(label.to_string());
350        }
351    }
352    min_label.unwrap_or_else(|| authority_label_for(min_tier))
353}
354
355fn authority_label_for(tier: AuthorityTier) -> String {
356    match tier {
357        AuthorityTier::Derived => "derived".into(),
358        AuthorityTier::Candidate => "candidate".into(),
359        AuthorityTier::Agent => "agent".into(),
360        AuthorityTier::User => "user".into(),
361    }
362}
363
364fn union_json_strings<'a, I: IntoIterator<Item = &'a Value>>(arrays: I) -> Value {
365    let mut seen: BTreeSet<String> = BTreeSet::new();
366    // Preserve first-seen ordering for stability, but dedupe via BTreeSet.
367    let mut ordered: Vec<Value> = Vec::new();
368    for value in arrays {
369        match value {
370            Value::Array(items) => {
371                for item in items {
372                    let key = canonical_key(item);
373                    if seen.insert(key) {
374                        ordered.push(item.clone());
375                    }
376                }
377            }
378            // Be lenient: some legacy rows may store a single string. Treat
379            // it as a single-element array for the union.
380            Value::String(s) => {
381                let v = Value::String(s.clone());
382                let key = canonical_key(&v);
383                if seen.insert(key) {
384                    ordered.push(v);
385                }
386            }
387            _ => {}
388        }
389    }
390    Value::Array(ordered)
391}
392
393fn json_array_is_empty(value: &Value) -> bool {
394    match value {
395        Value::Array(a) => a.is_empty(),
396        _ => true,
397    }
398}
399
400fn canonical_key(value: &Value) -> String {
401    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
402}
403
404fn source_memory_provenance_envelope(source_ids: &[MemoryId]) -> Value {
405    serde_json::json!({
406        "summary_of_memories": source_ids
407            .iter()
408            .map(ToString::to_string)
409            .collect::<Vec<_>>(),
410    })
411}
412
413fn source_episode_provenance_envelope(source_ids: &[EpisodeId]) -> Value {
414    serde_json::json!({
415        "summary_of_episodes": source_ids
416            .iter()
417            .map(ToString::to_string)
418            .collect::<Vec<_>>(),
419    })
420}
421
422fn source_episode_id_array(source_ids: &[EpisodeId]) -> Value {
423    Value::Array(
424        source_ids
425            .iter()
426            .map(|id| Value::String(id.to_string()))
427            .collect(),
428    )
429}
430
431#[cfg(test)]
432mod tests {
433    use super::*;
434    use cortex_core::{
435        compose_policy_outcomes, PolicyContribution, PolicyDecision, PolicyOutcome, TraceId,
436    };
437    use cortex_store::migrate::apply_pending;
438    use cortex_store::repo::{EpisodeRecord, EpisodeRepo, TraceRepo};
439    use rusqlite::Connection;
440
441    fn seed_pool() -> Pool {
442        let pool = Connection::open_in_memory().expect("open in-memory pool");
443        apply_pending(&pool).expect("apply migrations");
444        pool
445    }
446
447    fn insert_test_memory(
448        pool: &Pool,
449        claim: &str,
450        confidence: f64,
451        authority: &str,
452        domains: &[&str],
453        source_event_ids: &[&str],
454    ) -> MemoryId {
455        let id = MemoryId::new();
456        let candidate = MemoryCandidate {
457            id,
458            memory_type: "semantic".into(),
459            claim: claim.into(),
460            source_episodes_json: Value::Array(Vec::new()),
461            source_events_json: Value::Array(
462                source_event_ids
463                    .iter()
464                    .map(|s| Value::String((*s).into()))
465                    .collect(),
466            ),
467            domains_json: Value::Array(
468                domains.iter().map(|s| Value::String((*s).into())).collect(),
469            ),
470            salience_json: Value::Object(serde_json::Map::new()),
471            confidence,
472            authority: authority.into(),
473            applies_when_json: Value::Object(serde_json::Map::new()),
474            does_not_apply_when_json: Value::Array(Vec::new()),
475            created_at: Utc::now(),
476            updated_at: Utc::now(),
477        };
478        MemoryRepo::new(pool)
479            .insert_candidate(&candidate)
480            .expect("insert");
481        id
482    }
483
484    fn dummy_episode_policy() -> PolicyDecision {
485        compose_policy_outcomes(
486            vec![
487                PolicyContribution::new(
488                    "episode.insert.source_event_lineage",
489                    PolicyOutcome::Allow,
490                    "test seed",
491                )
492                .expect("contribution"),
493                PolicyContribution::new(
494                    "episode.insert.redaction_status",
495                    PolicyOutcome::Allow,
496                    "test seed",
497                )
498                .expect("contribution"),
499            ],
500            None,
501        )
502    }
503
504    fn insert_test_episode(
505        pool: &Pool,
506        summary: &str,
507        confidence: f64,
508        source_event_ids: &[&str],
509    ) -> EpisodeId {
510        // Episodes need a trace; insert a minimal trace via TraceRepo.
511        let trace_id = TraceId::new();
512        let trace = cortex_core::Trace {
513            id: trace_id,
514            schema_version: 1,
515            opened_at: Utc::now(),
516            closed_at: None,
517            event_ids: Vec::new(),
518            trace_type: "test".into(),
519            status: cortex_core::TraceStatus::Open,
520        };
521        TraceRepo::new(pool).open(&trace).expect("insert trace");
522        let id = EpisodeId::new();
523        let record = EpisodeRecord {
524            id,
525            trace_id,
526            source_events_json: Value::Array(
527                source_event_ids
528                    .iter()
529                    .map(|s| Value::String((*s).into()))
530                    .collect(),
531            ),
532            summary: summary.into(),
533            domains_json: Value::Array(vec![Value::String("test-domain".into())]),
534            entities_json: Value::Array(Vec::new()),
535            candidate_meaning: None,
536            extracted_by_json: Value::Object(serde_json::Map::new()),
537            confidence,
538            status: "interpreted".into(),
539        };
540        EpisodeRepo::new(pool)
541            .insert(&record, &dummy_episode_policy())
542            .expect("insert episode");
543        id
544    }
545
546    #[test]
547    fn compress_candidate_memories_preserves_provenance() {
548        let pool = seed_pool();
549        let m1 = insert_test_memory(
550            &pool,
551            "alpha",
552            0.8,
553            "candidate",
554            &["a", "b"],
555            &[
556                "evt_01ARZ3NDEKTSV4RRFFQ69G5F01",
557                "evt_01ARZ3NDEKTSV4RRFFQ69G5F02",
558            ],
559        );
560        let m2 = insert_test_memory(
561            &pool,
562            "beta",
563            0.7,
564            "candidate",
565            &["b", "c"],
566            &[
567                "evt_01ARZ3NDEKTSV4RRFFQ69G5F02",
568                "evt_01ARZ3NDEKTSV4RRFFQ69G5F03",
569            ],
570        );
571
572        let summary_id =
573            compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
574        let summary = MemoryRepo::new(&pool)
575            .get_by_id(&summary_id)
576            .unwrap()
577            .unwrap();
578
579        let events = match summary.source_events_json {
580            Value::Array(v) => v,
581            other => panic!("expected array, got {other:?}"),
582        };
583        let event_strings: BTreeSet<String> = events
584            .iter()
585            .filter_map(|v| v.as_str().map(str::to_string))
586            .collect();
587        assert!(event_strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5F01"));
588        assert!(event_strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5F02"));
589        assert!(event_strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5F03"));
590        // Union, not duplication: the shared event id appears exactly once.
591        assert_eq!(event_strings.len(), 3);
592
593        let domains = match summary.domains_json {
594            Value::Array(v) => v,
595            other => panic!("expected array, got {other:?}"),
596        };
597        let domain_strings: BTreeSet<String> = domains
598            .iter()
599            .filter_map(|v| v.as_str().map(str::to_string))
600            .collect();
601        assert!(domain_strings.contains("a"));
602        assert!(domain_strings.contains("b"));
603        assert!(domain_strings.contains("c"));
604        assert_eq!(domain_strings.len(), 3);
605
606        assert_eq!(summary.memory_type, "summary");
607    }
608
609    #[test]
610    fn compress_candidate_memories_confidence_is_pessimistic_min() {
611        let pool = seed_pool();
612        let m1 = insert_test_memory(
613            &pool,
614            "alpha",
615            0.95,
616            "candidate",
617            &["a"],
618            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
619        );
620        let m2 = insert_test_memory(
621            &pool,
622            "beta",
623            0.42,
624            "candidate",
625            &["b"],
626            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
627        );
628        let m3 = insert_test_memory(
629            &pool,
630            "gamma",
631            0.7,
632            "candidate",
633            &["c"],
634            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F03"],
635        );
636
637        let summary_id =
638            compress_candidate_memories(&pool, &[m1, m2, m3], "op-x").expect("compress ok");
639        let summary = MemoryRepo::new(&pool)
640            .get_by_id(&summary_id)
641            .unwrap()
642            .unwrap();
643        assert!(
644            (summary.confidence - 0.42).abs() < 1e-9,
645            "got {}",
646            summary.confidence
647        );
648    }
649
650    #[test]
651    fn compress_candidate_memories_authority_is_lowest_tier() {
652        let pool = seed_pool();
653        // User > Agent > Candidate > Derived.
654        let m_user = insert_test_memory(
655            &pool,
656            "u",
657            0.9,
658            "user",
659            &["x"],
660            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
661        );
662        let m_agent = insert_test_memory(
663            &pool,
664            "a",
665            0.9,
666            "agent",
667            &["x"],
668            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
669        );
670        let m_cand = insert_test_memory(
671            &pool,
672            "c",
673            0.9,
674            "candidate",
675            &["x"],
676            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F03"],
677        );
678
679        let summary_id = compress_candidate_memories(&pool, &[m_user, m_agent, m_cand], "op-x")
680            .expect("compress ok");
681        let summary = MemoryRepo::new(&pool)
682            .get_by_id(&summary_id)
683            .unwrap()
684            .unwrap();
685        // The lowest tier present is "candidate" — its label is preserved
686        // verbatim on the produced summary (no upward laundering).
687        assert_eq!(summary.authority, "candidate");
688    }
689
690    #[test]
691    fn compress_candidate_memories_does_not_delete_sources() {
692        let pool = seed_pool();
693        let m1 = insert_test_memory(
694            &pool,
695            "alpha",
696            0.8,
697            "candidate",
698            &["a"],
699            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
700        );
701        let m2 = insert_test_memory(
702            &pool,
703            "beta",
704            0.7,
705            "candidate",
706            &["b"],
707            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
708        );
709
710        let _summary = compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
711
712        let repo = MemoryRepo::new(&pool);
713        assert!(
714            repo.get_by_id(&m1).unwrap().is_some(),
715            "source m1 must remain"
716        );
717        assert!(
718            repo.get_by_id(&m2).unwrap().is_some(),
719            "source m2 must remain"
720        );
721    }
722
723    #[test]
724    fn compress_candidate_memories_records_supersession_edges() {
725        let pool = seed_pool();
726        let m1 = insert_test_memory(
727            &pool,
728            "alpha",
729            0.8,
730            "candidate",
731            &["a"],
732            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
733        );
734        let m2 = insert_test_memory(
735            &pool,
736            "beta",
737            0.7,
738            "candidate",
739            &["b"],
740            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
741        );
742        let summary_id =
743            compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
744
745        let job_repo = DecayJobRepo::new(&pool);
746        let sources = job_repo
747            .list_memory_sources_for(&summary_id)
748            .expect("list sources");
749        let set: BTreeSet<String> = sources.iter().map(ToString::to_string).collect();
750        assert!(set.contains(&m1.to_string()));
751        assert!(set.contains(&m2.to_string()));
752        assert_eq!(set.len(), 2);
753    }
754
755    #[test]
756    fn compress_episodes_preserves_event_provenance() {
757        let pool = seed_pool();
758        let e1 = insert_test_episode(
759            &pool,
760            "alpha episode",
761            0.9,
762            &["evt_01ARZ3NDEKTSV4RRFFQ69G5E01"],
763        );
764        let e2 = insert_test_episode(
765            &pool,
766            "beta episode",
767            0.6,
768            &["evt_01ARZ3NDEKTSV4RRFFQ69G5E02"],
769        );
770
771        let summary_id = compress_episodes(&pool, &[e1, e2], "op-x").expect("compress ok");
772        let summary = MemoryRepo::new(&pool)
773            .get_by_id(&summary_id)
774            .unwrap()
775            .unwrap();
776        let events = match summary.source_events_json {
777            Value::Array(v) => v,
778            other => panic!("expected array, got {other:?}"),
779        };
780        let strings: BTreeSet<String> = events
781            .iter()
782            .filter_map(|v| v.as_str().map(str::to_string))
783            .collect();
784        assert!(strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5E01"));
785        assert!(strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5E02"));
786
787        // Pessimistic min confidence.
788        assert!(
789            (summary.confidence - 0.6).abs() < 1e-9,
790            "got {}",
791            summary.confidence
792        );
793
794        // Episode supersession edges recorded.
795        let job_repo = DecayJobRepo::new(&pool);
796        let sources = job_repo
797            .list_episode_sources_for(&summary_id)
798            .expect("list sources");
799        assert_eq!(sources.len(), 2);
800    }
801
802    #[test]
803    fn compress_truncates_claims_over_budget() {
804        let pool = seed_pool();
805        let long = "x".repeat(DECAY_SUMMARY_MAX_CLAIM_BYTES);
806        let m1 = insert_test_memory(
807            &pool,
808            &long,
809            0.9,
810            "candidate",
811            &["a"],
812            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
813        );
814        let m2 = insert_test_memory(
815            &pool,
816            &long,
817            0.9,
818            "candidate",
819            &["b"],
820            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
821        );
822        let summary_id =
823            compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
824        let summary = MemoryRepo::new(&pool)
825            .get_by_id(&summary_id)
826            .unwrap()
827            .unwrap();
828        assert!(summary.claim.ends_with(DECAY_SUMMARY_TRUNCATION_SUFFIX));
829        assert!(summary.claim.len() <= DECAY_SUMMARY_MAX_CLAIM_BYTES);
830    }
831
832    #[test]
833    fn compress_refuses_empty_sources() {
834        let pool = seed_pool();
835        let err = compress_candidate_memories(&pool, &[], "op-x").unwrap_err();
836        match err {
837            DecayError::Validation(msg) => {
838                assert!(msg.contains(DECAY_COMPRESS_INPUT_INVALID_INVARIANT));
839            }
840            other => panic!("expected validation, got {other:?}"),
841        }
842    }
843
844    #[test]
845    fn compress_refuses_empty_operator() {
846        let pool = seed_pool();
847        let m = insert_test_memory(
848            &pool,
849            "alpha",
850            0.8,
851            "candidate",
852            &["a"],
853            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
854        );
855        let err = compress_candidate_memories(&pool, &[m], "   ").unwrap_err();
856        assert!(matches!(err, DecayError::Validation(_)));
857    }
858
859    #[test]
860    fn compress_refuses_missing_source() {
861        let pool = seed_pool();
862        let phantom = MemoryId::new();
863        let err = compress_candidate_memories(&pool, &[phantom], "op-x").unwrap_err();
864        match err {
865            DecayError::Validation(msg) => {
866                assert!(msg.contains(DECAY_COMPRESS_SOURCE_MISSING_INVARIANT));
867            }
868            other => panic!("expected validation, got {other:?}"),
869        }
870    }
871
872    #[test]
873    fn compress_is_deterministic_for_same_inputs() {
874        // Determinism contract: two compressions of the same source set
875        // (modulo memory-id and timestamps) must produce byte-stable
876        // claim/confidence/authority/provenance.
877        let pool = seed_pool();
878        let m1 = insert_test_memory(
879            &pool,
880            "alpha",
881            0.42,
882            "candidate",
883            &["a", "b"],
884            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
885        );
886        let m2 = insert_test_memory(
887            &pool,
888            "beta",
889            0.8,
890            "candidate",
891            &["b", "c"],
892            &["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
893        );
894
895        let sum1 = compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress 1");
896        let sum2 = compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress 2");
897        let repo = MemoryRepo::new(&pool);
898        let r1 = repo.get_by_id(&sum1).unwrap().unwrap();
899        let r2 = repo.get_by_id(&sum2).unwrap().unwrap();
900        assert_eq!(r1.claim, r2.claim);
901        assert_eq!(r1.confidence, r2.confidence);
902        assert_eq!(r1.authority, r2.authority);
903        assert_eq!(r1.source_events_json, r2.source_events_json);
904        assert_eq!(r1.domains_json, r2.domains_json);
905    }
906}