use std::collections::BTreeSet;
use chrono::Utc;
use cortex_core::{DecayJobId, EpisodeId, MemoryId};
use cortex_store::repo::{DecayJobRepo, EpisodeRepo, MemoryCandidate, MemoryRecord, MemoryRepo};
use cortex_store::Pool;
use serde_json::Value;
use super::{
DecayError, DecayResult, DECAY_COMPRESS_INPUT_INVALID_INVARIANT,
DECAY_COMPRESS_SOURCE_MISSING_INVARIANT, DECAY_SUMMARY_CLAIM_SEPARATOR,
DECAY_SUMMARY_MAX_CLAIM_BYTES, DECAY_SUMMARY_TRUNCATION_SUFFIX,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum AuthorityTier {
Derived,
Candidate,
Agent,
User,
}
impl AuthorityTier {
#[must_use]
pub const fn trust_rank(self) -> u8 {
match self {
Self::Derived => 0,
Self::Candidate => 1,
Self::Agent => 2,
Self::User => 3,
}
}
#[must_use]
pub fn parse_lenient(authority: &str) -> Self {
match authority {
"user" | "User" => Self::User,
"agent" | "Agent" => Self::Agent,
"candidate" | "Candidate" => Self::Candidate,
_ => Self::Derived,
}
}
}
pub fn compress_candidate_memories(
pool: &Pool,
source_memory_ids: &[MemoryId],
operator: &str,
) -> DecayResult<MemoryId> {
compress_candidate_memories_with_job(pool, source_memory_ids, operator, None)
}
pub fn compress_candidate_memories_with_job(
pool: &Pool,
source_memory_ids: &[MemoryId],
operator: &str,
job_id: Option<&DecayJobId>,
) -> DecayResult<MemoryId> {
if source_memory_ids.is_empty() {
return Err(DecayError::Validation(format!(
"{DECAY_COMPRESS_INPUT_INVALID_INVARIANT}: source_memory_ids must be non-empty"
)));
}
if operator.trim().is_empty() {
return Err(DecayError::Validation(
"operator label must be non-empty".into(),
));
}
let memory_repo = MemoryRepo::new(pool);
let mut sources = Vec::with_capacity(source_memory_ids.len());
for id in source_memory_ids {
match memory_repo.get_by_id(id)? {
Some(record) => sources.push(record),
None => {
return Err(DecayError::Validation(format!(
"{DECAY_COMPRESS_SOURCE_MISSING_INVARIANT}: memory {id} not found"
)));
}
}
}
let summary = build_memory_summary(&sources, source_memory_ids);
let summary_id = summary.id;
memory_repo.insert_candidate(&summary)?;
let job_repo = DecayJobRepo::new(pool);
let now = Utc::now();
for source in &sources {
job_repo.record_memory_supersession(&source.id, &summary_id, job_id, now)?;
}
Ok(summary_id)
}
pub fn compress_episodes(
pool: &Pool,
source_episode_ids: &[EpisodeId],
operator: &str,
) -> DecayResult<MemoryId> {
compress_episodes_with_job(pool, source_episode_ids, operator, None)
}
pub fn compress_episodes_with_job(
pool: &Pool,
source_episode_ids: &[EpisodeId],
operator: &str,
job_id: Option<&DecayJobId>,
) -> DecayResult<MemoryId> {
if source_episode_ids.is_empty() {
return Err(DecayError::Validation(format!(
"{DECAY_COMPRESS_INPUT_INVALID_INVARIANT}: source_episode_ids must be non-empty"
)));
}
if operator.trim().is_empty() {
return Err(DecayError::Validation(
"operator label must be non-empty".into(),
));
}
let episode_repo = EpisodeRepo::new(pool);
let mut sources = Vec::with_capacity(source_episode_ids.len());
for id in source_episode_ids {
match episode_repo.get_by_id(id)? {
Some(record) => sources.push(record),
None => {
return Err(DecayError::Validation(format!(
"{DECAY_COMPRESS_SOURCE_MISSING_INVARIANT}: episode {id} not found"
)));
}
}
}
let summary = build_episode_summary(&sources, source_episode_ids);
let summary_id = summary.id;
let memory_repo = MemoryRepo::new(pool);
memory_repo.insert_candidate(&summary)?;
let job_repo = DecayJobRepo::new(pool);
let now = Utc::now();
for source in &sources {
job_repo.record_episode_supersession(&source.id, &summary_id, job_id, now)?;
}
Ok(summary_id)
}
fn build_memory_summary(sources: &[MemoryRecord], source_ids: &[MemoryId]) -> MemoryCandidate {
let claim = concatenate_claims(sources.iter().map(|m| m.claim.as_str()));
let confidence = pessimistic_min_confidence(sources.iter().map(|m| m.confidence));
let authority = lowest_authority_label(sources.iter().map(|m| m.authority.as_str()));
let domains = union_json_strings(sources.iter().map(|m| &m.domains_json));
let source_events = union_json_strings(sources.iter().map(|m| &m.source_events_json));
let source_episodes = union_json_strings(sources.iter().map(|m| &m.source_episodes_json));
let source_episodes =
if json_array_is_empty(&source_episodes) && json_array_is_empty(&source_events) {
Value::Array(
source_ids
.iter()
.map(|id| Value::String(id.to_string()))
.collect(),
)
} else {
source_episodes
};
let now = Utc::now();
let applies_when = source_memory_provenance_envelope(source_ids);
MemoryCandidate {
id: MemoryId::new(),
memory_type: "summary".into(),
claim,
source_episodes_json: source_episodes,
source_events_json: source_events,
domains_json: domains,
salience_json: Value::Object(serde_json::Map::new()),
confidence,
authority,
applies_when_json: applies_when,
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: now,
updated_at: now,
}
}
fn build_episode_summary(
sources: &[cortex_store::repo::EpisodeRecord],
source_ids: &[EpisodeId],
) -> MemoryCandidate {
let claim = concatenate_claims(sources.iter().map(|e| e.summary.as_str()));
let confidence = pessimistic_min_confidence(sources.iter().map(|e| e.confidence));
let authority = AuthorityTier::Derived;
let domains = union_json_strings(sources.iter().map(|e| &e.domains_json));
let source_episodes = source_episode_id_array(source_ids);
let source_events = union_json_strings(sources.iter().map(|e| &e.source_events_json));
let now = Utc::now();
let applies_when = source_episode_provenance_envelope(source_ids);
MemoryCandidate {
id: MemoryId::new(),
memory_type: "summary".into(),
claim,
source_episodes_json: source_episodes,
source_events_json: source_events,
domains_json: domains,
salience_json: Value::Object(serde_json::Map::new()),
confidence,
authority: authority_label_for(authority),
applies_when_json: applies_when,
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: now,
updated_at: now,
}
}
fn concatenate_claims<'a, I: IntoIterator<Item = &'a str>>(claims: I) -> String {
let joined: String = claims
.into_iter()
.collect::<Vec<_>>()
.join(DECAY_SUMMARY_CLAIM_SEPARATOR);
if joined.len() <= DECAY_SUMMARY_MAX_CLAIM_BYTES {
return joined;
}
let suffix = DECAY_SUMMARY_TRUNCATION_SUFFIX;
let budget = DECAY_SUMMARY_MAX_CLAIM_BYTES.saturating_sub(suffix.len());
let mut end = budget;
while end > 0 && !joined.is_char_boundary(end) {
end -= 1;
}
let mut out = String::with_capacity(end + suffix.len());
out.push_str(&joined[..end]);
out.push_str(suffix);
out
}
fn pessimistic_min_confidence<I: IntoIterator<Item = f64>>(values: I) -> f64 {
values
.into_iter()
.fold(f64::INFINITY, |acc, v| acc.min(v))
.clamp(0.0, 1.0)
}
fn lowest_authority_label<'a, I: IntoIterator<Item = &'a str>>(labels: I) -> String {
let mut min_tier = AuthorityTier::User;
let mut min_label: Option<String> = None;
for label in labels {
let tier = AuthorityTier::parse_lenient(label);
if tier <= min_tier {
min_tier = tier;
min_label = Some(label.to_string());
}
}
min_label.unwrap_or_else(|| authority_label_for(min_tier))
}
fn authority_label_for(tier: AuthorityTier) -> String {
match tier {
AuthorityTier::Derived => "derived".into(),
AuthorityTier::Candidate => "candidate".into(),
AuthorityTier::Agent => "agent".into(),
AuthorityTier::User => "user".into(),
}
}
fn union_json_strings<'a, I: IntoIterator<Item = &'a Value>>(arrays: I) -> Value {
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut ordered: Vec<Value> = Vec::new();
for value in arrays {
match value {
Value::Array(items) => {
for item in items {
let key = canonical_key(item);
if seen.insert(key) {
ordered.push(item.clone());
}
}
}
Value::String(s) => {
let v = Value::String(s.clone());
let key = canonical_key(&v);
if seen.insert(key) {
ordered.push(v);
}
}
_ => {}
}
}
Value::Array(ordered)
}
fn json_array_is_empty(value: &Value) -> bool {
match value {
Value::Array(a) => a.is_empty(),
_ => true,
}
}
fn canonical_key(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
}
fn source_memory_provenance_envelope(source_ids: &[MemoryId]) -> Value {
serde_json::json!({
"summary_of_memories": source_ids
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>(),
})
}
fn source_episode_provenance_envelope(source_ids: &[EpisodeId]) -> Value {
serde_json::json!({
"summary_of_episodes": source_ids
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>(),
})
}
fn source_episode_id_array(source_ids: &[EpisodeId]) -> Value {
Value::Array(
source_ids
.iter()
.map(|id| Value::String(id.to_string()))
.collect(),
)
}
#[cfg(test)]
mod tests {
use super::*;
use cortex_core::{
compose_policy_outcomes, PolicyContribution, PolicyDecision, PolicyOutcome, TraceId,
};
use cortex_store::migrate::apply_pending;
use cortex_store::repo::{EpisodeRecord, EpisodeRepo, TraceRepo};
use rusqlite::Connection;
fn seed_pool() -> Pool {
let pool = Connection::open_in_memory().expect("open in-memory pool");
apply_pending(&pool).expect("apply migrations");
pool
}
fn insert_test_memory(
pool: &Pool,
claim: &str,
confidence: f64,
authority: &str,
domains: &[&str],
source_event_ids: &[&str],
) -> MemoryId {
let id = MemoryId::new();
let candidate = MemoryCandidate {
id,
memory_type: "semantic".into(),
claim: claim.into(),
source_episodes_json: Value::Array(Vec::new()),
source_events_json: Value::Array(
source_event_ids
.iter()
.map(|s| Value::String((*s).into()))
.collect(),
),
domains_json: Value::Array(
domains.iter().map(|s| Value::String((*s).into())).collect(),
),
salience_json: Value::Object(serde_json::Map::new()),
confidence,
authority: authority.into(),
applies_when_json: Value::Object(serde_json::Map::new()),
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: Utc::now(),
updated_at: Utc::now(),
};
MemoryRepo::new(pool)
.insert_candidate(&candidate)
.expect("insert");
id
}
fn dummy_episode_policy() -> PolicyDecision {
compose_policy_outcomes(
vec![
PolicyContribution::new(
"episode.insert.source_event_lineage",
PolicyOutcome::Allow,
"test seed",
)
.expect("contribution"),
PolicyContribution::new(
"episode.insert.redaction_status",
PolicyOutcome::Allow,
"test seed",
)
.expect("contribution"),
],
None,
)
}
fn insert_test_episode(
pool: &Pool,
summary: &str,
confidence: f64,
source_event_ids: &[&str],
) -> EpisodeId {
let trace_id = TraceId::new();
let trace = cortex_core::Trace {
id: trace_id,
schema_version: 1,
opened_at: Utc::now(),
closed_at: None,
event_ids: Vec::new(),
trace_type: "test".into(),
status: cortex_core::TraceStatus::Open,
};
TraceRepo::new(pool).open(&trace).expect("insert trace");
let id = EpisodeId::new();
let record = EpisodeRecord {
id,
trace_id,
source_events_json: Value::Array(
source_event_ids
.iter()
.map(|s| Value::String((*s).into()))
.collect(),
),
summary: summary.into(),
domains_json: Value::Array(vec![Value::String("test-domain".into())]),
entities_json: Value::Array(Vec::new()),
candidate_meaning: None,
extracted_by_json: Value::Object(serde_json::Map::new()),
confidence,
status: "interpreted".into(),
};
EpisodeRepo::new(pool)
.insert(&record, &dummy_episode_policy())
.expect("insert episode");
id
}
#[test]
fn compress_candidate_memories_preserves_provenance() {
let pool = seed_pool();
let m1 = insert_test_memory(
&pool,
"alpha",
0.8,
"candidate",
&["a", "b"],
&[
"evt_01ARZ3NDEKTSV4RRFFQ69G5F01",
"evt_01ARZ3NDEKTSV4RRFFQ69G5F02",
],
);
let m2 = insert_test_memory(
&pool,
"beta",
0.7,
"candidate",
&["b", "c"],
&[
"evt_01ARZ3NDEKTSV4RRFFQ69G5F02",
"evt_01ARZ3NDEKTSV4RRFFQ69G5F03",
],
);
let summary_id =
compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
let summary = MemoryRepo::new(&pool)
.get_by_id(&summary_id)
.unwrap()
.unwrap();
let events = match summary.source_events_json {
Value::Array(v) => v,
other => panic!("expected array, got {other:?}"),
};
let event_strings: BTreeSet<String> = events
.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect();
assert!(event_strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5F01"));
assert!(event_strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5F02"));
assert!(event_strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5F03"));
assert_eq!(event_strings.len(), 3);
let domains = match summary.domains_json {
Value::Array(v) => v,
other => panic!("expected array, got {other:?}"),
};
let domain_strings: BTreeSet<String> = domains
.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect();
assert!(domain_strings.contains("a"));
assert!(domain_strings.contains("b"));
assert!(domain_strings.contains("c"));
assert_eq!(domain_strings.len(), 3);
assert_eq!(summary.memory_type, "summary");
}
#[test]
fn compress_candidate_memories_confidence_is_pessimistic_min() {
let pool = seed_pool();
let m1 = insert_test_memory(
&pool,
"alpha",
0.95,
"candidate",
&["a"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let m2 = insert_test_memory(
&pool,
"beta",
0.42,
"candidate",
&["b"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
);
let m3 = insert_test_memory(
&pool,
"gamma",
0.7,
"candidate",
&["c"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F03"],
);
let summary_id =
compress_candidate_memories(&pool, &[m1, m2, m3], "op-x").expect("compress ok");
let summary = MemoryRepo::new(&pool)
.get_by_id(&summary_id)
.unwrap()
.unwrap();
assert!(
(summary.confidence - 0.42).abs() < 1e-9,
"got {}",
summary.confidence
);
}
#[test]
fn compress_candidate_memories_authority_is_lowest_tier() {
let pool = seed_pool();
let m_user = insert_test_memory(
&pool,
"u",
0.9,
"user",
&["x"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let m_agent = insert_test_memory(
&pool,
"a",
0.9,
"agent",
&["x"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
);
let m_cand = insert_test_memory(
&pool,
"c",
0.9,
"candidate",
&["x"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F03"],
);
let summary_id = compress_candidate_memories(&pool, &[m_user, m_agent, m_cand], "op-x")
.expect("compress ok");
let summary = MemoryRepo::new(&pool)
.get_by_id(&summary_id)
.unwrap()
.unwrap();
assert_eq!(summary.authority, "candidate");
}
#[test]
fn compress_candidate_memories_does_not_delete_sources() {
let pool = seed_pool();
let m1 = insert_test_memory(
&pool,
"alpha",
0.8,
"candidate",
&["a"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let m2 = insert_test_memory(
&pool,
"beta",
0.7,
"candidate",
&["b"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
);
let _summary = compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
let repo = MemoryRepo::new(&pool);
assert!(
repo.get_by_id(&m1).unwrap().is_some(),
"source m1 must remain"
);
assert!(
repo.get_by_id(&m2).unwrap().is_some(),
"source m2 must remain"
);
}
#[test]
fn compress_candidate_memories_records_supersession_edges() {
let pool = seed_pool();
let m1 = insert_test_memory(
&pool,
"alpha",
0.8,
"candidate",
&["a"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let m2 = insert_test_memory(
&pool,
"beta",
0.7,
"candidate",
&["b"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
);
let summary_id =
compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
let job_repo = DecayJobRepo::new(&pool);
let sources = job_repo
.list_memory_sources_for(&summary_id)
.expect("list sources");
let set: BTreeSet<String> = sources.iter().map(ToString::to_string).collect();
assert!(set.contains(&m1.to_string()));
assert!(set.contains(&m2.to_string()));
assert_eq!(set.len(), 2);
}
#[test]
fn compress_episodes_preserves_event_provenance() {
let pool = seed_pool();
let e1 = insert_test_episode(
&pool,
"alpha episode",
0.9,
&["evt_01ARZ3NDEKTSV4RRFFQ69G5E01"],
);
let e2 = insert_test_episode(
&pool,
"beta episode",
0.6,
&["evt_01ARZ3NDEKTSV4RRFFQ69G5E02"],
);
let summary_id = compress_episodes(&pool, &[e1, e2], "op-x").expect("compress ok");
let summary = MemoryRepo::new(&pool)
.get_by_id(&summary_id)
.unwrap()
.unwrap();
let events = match summary.source_events_json {
Value::Array(v) => v,
other => panic!("expected array, got {other:?}"),
};
let strings: BTreeSet<String> = events
.iter()
.filter_map(|v| v.as_str().map(str::to_string))
.collect();
assert!(strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5E01"));
assert!(strings.contains("evt_01ARZ3NDEKTSV4RRFFQ69G5E02"));
assert!(
(summary.confidence - 0.6).abs() < 1e-9,
"got {}",
summary.confidence
);
let job_repo = DecayJobRepo::new(&pool);
let sources = job_repo
.list_episode_sources_for(&summary_id)
.expect("list sources");
assert_eq!(sources.len(), 2);
}
#[test]
fn compress_truncates_claims_over_budget() {
let pool = seed_pool();
let long = "x".repeat(DECAY_SUMMARY_MAX_CLAIM_BYTES);
let m1 = insert_test_memory(
&pool,
&long,
0.9,
"candidate",
&["a"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let m2 = insert_test_memory(
&pool,
&long,
0.9,
"candidate",
&["b"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
);
let summary_id =
compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress ok");
let summary = MemoryRepo::new(&pool)
.get_by_id(&summary_id)
.unwrap()
.unwrap();
assert!(summary.claim.ends_with(DECAY_SUMMARY_TRUNCATION_SUFFIX));
assert!(summary.claim.len() <= DECAY_SUMMARY_MAX_CLAIM_BYTES);
}
#[test]
fn compress_refuses_empty_sources() {
let pool = seed_pool();
let err = compress_candidate_memories(&pool, &[], "op-x").unwrap_err();
match err {
DecayError::Validation(msg) => {
assert!(msg.contains(DECAY_COMPRESS_INPUT_INVALID_INVARIANT));
}
other => panic!("expected validation, got {other:?}"),
}
}
#[test]
fn compress_refuses_empty_operator() {
let pool = seed_pool();
let m = insert_test_memory(
&pool,
"alpha",
0.8,
"candidate",
&["a"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let err = compress_candidate_memories(&pool, &[m], " ").unwrap_err();
assert!(matches!(err, DecayError::Validation(_)));
}
#[test]
fn compress_refuses_missing_source() {
let pool = seed_pool();
let phantom = MemoryId::new();
let err = compress_candidate_memories(&pool, &[phantom], "op-x").unwrap_err();
match err {
DecayError::Validation(msg) => {
assert!(msg.contains(DECAY_COMPRESS_SOURCE_MISSING_INVARIANT));
}
other => panic!("expected validation, got {other:?}"),
}
}
#[test]
fn compress_is_deterministic_for_same_inputs() {
let pool = seed_pool();
let m1 = insert_test_memory(
&pool,
"alpha",
0.42,
"candidate",
&["a", "b"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F01"],
);
let m2 = insert_test_memory(
&pool,
"beta",
0.8,
"candidate",
&["b", "c"],
&["evt_01ARZ3NDEKTSV4RRFFQ69G5F02"],
);
let sum1 = compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress 1");
let sum2 = compress_candidate_memories(&pool, &[m1, m2], "op-x").expect("compress 2");
let repo = MemoryRepo::new(&pool);
let r1 = repo.get_by_id(&sum1).unwrap().unwrap();
let r2 = repo.get_by_id(&sum2).unwrap().unwrap();
assert_eq!(r1.claim, r2.claim);
assert_eq!(r1.confidence, r2.confidence);
assert_eq!(r1.authority, r2.authority);
assert_eq!(r1.source_events_json, r2.source_events_json);
assert_eq!(r1.domains_json, r2.domains_json);
}
}