use std::collections::BTreeSet;
use std::path::Path;
use chrono::Utc;
use cortex_core::{DecayJobId, EpisodeId, MemoryId};
use cortex_llm::{SummaryBackend, SummaryError, SummaryRequest, SummaryResponse};
use cortex_store::repo::{
DecayJobRecord, DecayJobRepo, EpisodeRecord, EpisodeRepo, MemoryCandidate, MemoryRecord,
MemoryRepo,
};
use cortex_store::Pool;
use ed25519_dalek::{Signature, Verifier, VerifyingKey};
use serde::Deserialize;
use serde_json::Value;
use super::{
DecayError, DecayJobKind, DecayResult, SummaryMethod, DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE,
DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION, DECAY_SUMMARY_MAX_CLAIM_BYTES,
};
#[derive(Debug, Clone, Deserialize)]
pub struct LlmSummaryOperatorAttestationEnvelope {
pub schema_version: u16,
pub purpose: String,
pub operator_verifying_key_hex: String,
pub operator_key_id: String,
pub signed_at: String,
pub decay_job_id: String,
pub model_name: String,
pub prompt_template_blake3: String,
pub signature_hex: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum LlmSummarySourceKind {
Memory,
Episode,
}
pub fn run_llm_summary_job(
pool: &Pool,
job: &DecayJobRecord,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<MemoryId> {
let expected_wire = SummaryMethod::LlmSummary {
operator_attestation_required: true,
}
.method_wire();
if job.summary_method_wire != expected_wire {
return Err(DecayError::Validation(format!(
"run_llm_summary_job invoked on a job whose summary_method_wire is `{}` (expected `{expected_wire}`)",
job.summary_method_wire,
)));
}
let path = operator_attestation.ok_or(DecayError::LlmSummaryRequiresOperatorAttestation)?;
let envelope = load_envelope(path)?;
verify_envelope_for_job(&envelope, &job.id)?;
let source_kind = source_kind_from_job(job)?;
match source_kind {
LlmSummarySourceKind::Memory => {
let source_ids = parse_memory_source_ids(job)?;
let sources = load_memory_sources(pool, &source_ids)?;
let (request, claims) = build_request_from_memory_sources(&envelope, job, &sources);
let response = call_backend(summary_backend, &request)?;
validate_response(&response, &envelope, &claims)?;
let candidate = build_candidate_from_memory_sources(&sources, &source_ids, &response);
persist_summary_memory(pool, &candidate, job, &sources, &[])
}
LlmSummarySourceKind::Episode => {
let source_ids = parse_episode_source_ids(job)?;
let sources = load_episode_sources(pool, &source_ids)?;
let (request, claims) = build_request_from_episode_sources(&envelope, job, &sources);
let response = call_backend(summary_backend, &request)?;
validate_response(&response, &envelope, &claims)?;
let candidate = build_candidate_from_episode_sources(&sources, &source_ids, &response);
persist_summary_memory(pool, &candidate, job, &[], &sources)
}
}
}
pub fn run_llm_summary_job_typed(
pool: &Pool,
job: &DecayJobRecord,
kind: &DecayJobKind,
operator_attestation: Option<&Path>,
summary_backend: &dyn SummaryBackend,
) -> DecayResult<MemoryId> {
match kind.summary_method() {
Some(SummaryMethod::LlmSummary { .. }) => {
run_llm_summary_job(pool, job, operator_attestation, summary_backend)
}
Some(SummaryMethod::DeterministicConcatenate) => Err(DecayError::Validation(
"run_llm_summary_job_typed invoked on a deterministic-concatenate job".into(),
)),
None => Err(DecayError::Validation(
"run_llm_summary_job_typed invoked on a kind that carries no summary method".into(),
)),
}
}
fn source_kind_from_job(job: &DecayJobRecord) -> DecayResult<LlmSummarySourceKind> {
match job.kind_wire.as_str() {
"candidate_compression" => Ok(LlmSummarySourceKind::Memory),
"episode_compression" => Ok(LlmSummarySourceKind::Episode),
other => Err(DecayError::Validation(format!(
"run_llm_summary_job invoked on a job whose kind_wire is `{other}` (expected `candidate_compression` or `episode_compression`)",
))),
}
}
fn parse_memory_source_ids(job: &DecayJobRecord) -> DecayResult<Vec<MemoryId>> {
let array = job.source_ids_json.as_array().ok_or_else(|| {
DecayError::Validation("run_llm_summary_job: source_ids_json must be a JSON array".into())
})?;
let mut out = Vec::with_capacity(array.len());
for value in array {
let raw = value.as_str().ok_or_else(|| {
DecayError::Validation(
"run_llm_summary_job: source_ids_json entries must be strings".into(),
)
})?;
let id = raw.parse::<MemoryId>().map_err(|err| {
DecayError::Validation(format!(
"run_llm_summary_job: source id `{raw}` is not a memory id: {err}",
))
})?;
out.push(id);
}
if out.is_empty() {
return Err(DecayError::Validation(
"run_llm_summary_job: source_ids_json must contain at least one id".into(),
));
}
Ok(out)
}
fn parse_episode_source_ids(job: &DecayJobRecord) -> DecayResult<Vec<EpisodeId>> {
let array = job.source_ids_json.as_array().ok_or_else(|| {
DecayError::Validation("run_llm_summary_job: source_ids_json must be a JSON array".into())
})?;
let mut out = Vec::with_capacity(array.len());
for value in array {
let raw = value.as_str().ok_or_else(|| {
DecayError::Validation(
"run_llm_summary_job: source_ids_json entries must be strings".into(),
)
})?;
let id = raw.parse::<EpisodeId>().map_err(|err| {
DecayError::Validation(format!(
"run_llm_summary_job: source id `{raw}` is not an episode id: {err}",
))
})?;
out.push(id);
}
if out.is_empty() {
return Err(DecayError::Validation(
"run_llm_summary_job: source_ids_json must contain at least one id".into(),
));
}
Ok(out)
}
fn load_memory_sources(pool: &Pool, ids: &[MemoryId]) -> DecayResult<Vec<MemoryRecord>> {
let repo = MemoryRepo::new(pool);
let mut out = Vec::with_capacity(ids.len());
for id in ids {
match repo.get_by_id(id)? {
Some(record) => out.push(record),
None => {
return Err(DecayError::Validation(format!(
"{}: memory {id} not found",
super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT
)));
}
}
}
Ok(out)
}
fn load_episode_sources(pool: &Pool, ids: &[EpisodeId]) -> DecayResult<Vec<EpisodeRecord>> {
let repo = EpisodeRepo::new(pool);
let mut out = Vec::with_capacity(ids.len());
for id in ids {
match repo.get_by_id(id)? {
Some(record) => out.push(record),
None => {
return Err(DecayError::Validation(format!(
"{}: episode {id} not found",
super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT
)));
}
}
}
Ok(out)
}
fn build_request_from_memory_sources(
envelope: &LlmSummaryOperatorAttestationEnvelope,
job: &DecayJobRecord,
sources: &[MemoryRecord],
) -> (SummaryRequest, Vec<String>) {
let claims: Vec<String> = sources.iter().map(|m| m.claim.clone()).collect();
let request = SummaryRequest {
model_name: envelope.model_name.clone(),
prompt_template_blake3: envelope.prompt_template_blake3.clone(),
source_claims: claims.clone(),
max_output_bytes: Some(DECAY_SUMMARY_MAX_CLAIM_BYTES),
decay_job_id: Some(job.id.to_string()),
};
(request, claims)
}
fn build_request_from_episode_sources(
envelope: &LlmSummaryOperatorAttestationEnvelope,
job: &DecayJobRecord,
sources: &[EpisodeRecord],
) -> (SummaryRequest, Vec<String>) {
let claims: Vec<String> = sources.iter().map(|e| e.summary.clone()).collect();
let request = SummaryRequest {
model_name: envelope.model_name.clone(),
prompt_template_blake3: envelope.prompt_template_blake3.clone(),
source_claims: claims.clone(),
max_output_bytes: Some(DECAY_SUMMARY_MAX_CLAIM_BYTES),
decay_job_id: Some(job.id.to_string()),
};
(request, claims)
}
fn call_backend(
backend: &dyn SummaryBackend,
request: &SummaryRequest,
) -> DecayResult<SummaryResponse> {
backend
.summarize(request)
.map_err(|err| DecayError::LlmSummaryBackendCallFailed(format_backend_error(err)))
}
fn format_backend_error(err: SummaryError) -> String {
err.to_string()
}
fn validate_response(
response: &SummaryResponse,
envelope: &LlmSummaryOperatorAttestationEnvelope,
source_claims: &[String],
) -> DecayResult<()> {
if response.model_name_echoed != envelope.model_name {
return Err(DecayError::LlmSummaryBackendCallFailed(format!(
"model mismatch: backend echoed model_name=`{}` but operator attestation pinned model_name=`{}`",
response.model_name_echoed, envelope.model_name,
)));
}
if response.claim.trim().is_empty() {
return Err(DecayError::LlmSummaryBackendCallFailed(
"backend produced an empty summary claim".into(),
));
}
if response.claim.len() > DECAY_SUMMARY_MAX_CLAIM_BYTES {
return Err(DecayError::LlmSummaryBackendCallFailed(format!(
"backend produced a summary claim of {} bytes (limit {DECAY_SUMMARY_MAX_CLAIM_BYTES})",
response.claim.len(),
)));
}
if source_claims.is_empty() {
return Err(DecayError::Validation(
"run_llm_summary_job: source_claims must not be empty after loading sources".into(),
));
}
Ok(())
}
fn build_candidate_from_memory_sources(
sources: &[MemoryRecord],
source_ids: &[MemoryId],
response: &SummaryResponse,
) -> MemoryCandidate {
let confidence = pessimistic_min_confidence(sources.iter().map(|m| m.confidence));
let authority = lowest_authority_label(sources.iter().map(|m| m.authority.as_str()));
let domains = union_json_strings(sources.iter().map(|m| &m.domains_json));
let source_events = union_json_strings(sources.iter().map(|m| &m.source_events_json));
let source_episodes = union_json_strings(sources.iter().map(|m| &m.source_episodes_json));
let source_episodes =
if json_array_is_empty(&source_episodes) && json_array_is_empty(&source_events) {
Value::Array(
source_ids
.iter()
.map(|id| Value::String(id.to_string()))
.collect(),
)
} else {
source_episodes
};
let applies_when = serde_json::json!({
"summary_of_memories": source_ids
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>(),
"llm_summary": {
"model_name": response.model_name_echoed,
}
});
let now = Utc::now();
MemoryCandidate {
id: MemoryId::new(),
memory_type: "summary".into(),
claim: response.claim.clone(),
source_episodes_json: source_episodes,
source_events_json: source_events,
domains_json: domains,
salience_json: Value::Object(serde_json::Map::new()),
confidence,
authority,
applies_when_json: applies_when,
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: now,
updated_at: now,
}
}
fn build_candidate_from_episode_sources(
sources: &[EpisodeRecord],
source_ids: &[EpisodeId],
response: &SummaryResponse,
) -> MemoryCandidate {
let confidence = pessimistic_min_confidence(sources.iter().map(|e| e.confidence));
let authority = "derived".to_string();
let domains = union_json_strings(sources.iter().map(|e| &e.domains_json));
let source_events = union_json_strings(sources.iter().map(|e| &e.source_events_json));
let source_episodes = Value::Array(
source_ids
.iter()
.map(|id| Value::String(id.to_string()))
.collect(),
);
let applies_when = serde_json::json!({
"summary_of_episodes": source_ids
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>(),
"llm_summary": {
"model_name": response.model_name_echoed,
}
});
let now = Utc::now();
MemoryCandidate {
id: MemoryId::new(),
memory_type: "summary".into(),
claim: response.claim.clone(),
source_episodes_json: source_episodes,
source_events_json: source_events,
domains_json: domains,
salience_json: Value::Object(serde_json::Map::new()),
confidence,
authority,
applies_when_json: applies_when,
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: now,
updated_at: now,
}
}
fn persist_summary_memory(
pool: &Pool,
candidate: &MemoryCandidate,
job: &DecayJobRecord,
memory_sources: &[MemoryRecord],
episode_sources: &[EpisodeRecord],
) -> DecayResult<MemoryId> {
let memory_repo = MemoryRepo::new(pool);
memory_repo.insert_candidate(candidate)?;
let summary_id = candidate.id;
let job_repo = DecayJobRepo::new(pool);
let now = Utc::now();
for source in memory_sources {
job_repo.record_memory_supersession(&source.id, &summary_id, Some(&job.id), now)?;
}
for source in episode_sources {
job_repo.record_episode_supersession(&source.id, &summary_id, Some(&job.id), now)?;
}
Ok(summary_id)
}
fn pessimistic_min_confidence<I: IntoIterator<Item = f64>>(values: I) -> f64 {
values
.into_iter()
.fold(f64::INFINITY, |acc, v| acc.min(v))
.clamp(0.0, 1.0)
}
fn lowest_authority_label<'a, I: IntoIterator<Item = &'a str>>(labels: I) -> String {
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
enum Tier {
Derived,
Candidate,
Agent,
User,
}
fn parse(label: &str) -> Tier {
match label {
"user" | "User" => Tier::User,
"agent" | "Agent" => Tier::Agent,
"candidate" | "Candidate" => Tier::Candidate,
_ => Tier::Derived,
}
}
let mut min_tier = Tier::User;
let mut min_label: Option<String> = None;
for label in labels {
let tier = parse(label);
if tier <= min_tier {
min_tier = tier;
min_label = Some(label.to_string());
}
}
min_label.unwrap_or_else(|| match min_tier {
Tier::Derived => "derived".into(),
Tier::Candidate => "candidate".into(),
Tier::Agent => "agent".into(),
Tier::User => "user".into(),
})
}
fn union_json_strings<'a, I: IntoIterator<Item = &'a Value>>(arrays: I) -> Value {
let mut seen: BTreeSet<String> = BTreeSet::new();
let mut ordered: Vec<Value> = Vec::new();
for value in arrays {
match value {
Value::Array(items) => {
for item in items {
let key = canonical_key(item);
if seen.insert(key) {
ordered.push(item.clone());
}
}
}
Value::String(s) => {
let v = Value::String(s.clone());
let key = canonical_key(&v);
if seen.insert(key) {
ordered.push(v);
}
}
_ => {}
}
}
Value::Array(ordered)
}
fn json_array_is_empty(value: &Value) -> bool {
match value {
Value::Array(a) => a.is_empty(),
_ => true,
}
}
fn canonical_key(value: &Value) -> String {
serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
}
fn load_envelope(path: &Path) -> DecayResult<LlmSummaryOperatorAttestationEnvelope> {
if !path.is_file() {
return Err(DecayError::LlmSummaryAttestationRejected(format!(
"envelope `{}` not found",
path.display()
)));
}
let raw = std::fs::read_to_string(path).map_err(|err| {
DecayError::LlmSummaryAttestationRejected(format!(
"envelope `{}` could not be read: {err}",
path.display()
))
})?;
serde_json::from_str(&raw).map_err(|err| {
DecayError::LlmSummaryAttestationRejected(format!(
"envelope `{}` is not valid JSON: {err}",
path.display()
))
})
}
fn verify_envelope_for_job(
envelope: &LlmSummaryOperatorAttestationEnvelope,
expected_job_id: &DecayJobId,
) -> DecayResult<()> {
if envelope.schema_version != DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION {
return Err(DecayError::LlmSummaryAttestationRejected(format!(
"schema_version {} (expected {})",
envelope.schema_version, DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION,
)));
}
if envelope.purpose != DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE {
return Err(DecayError::LlmSummaryAttestationRejected(format!(
"purpose `{}` (expected `{}`)",
envelope.purpose, DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE,
)));
}
if envelope.decay_job_id != expected_job_id.to_string() {
return Err(DecayError::LlmSummaryAttestationRejected(format!(
"decay_job_id `{}` does not match job `{}`",
envelope.decay_job_id, expected_job_id,
)));
}
if envelope.model_name.trim().is_empty() {
return Err(DecayError::LlmSummaryAttestationRejected(
"model_name must be non-empty".into(),
));
}
if !envelope.prompt_template_blake3.starts_with("blake3:") {
return Err(DecayError::LlmSummaryAttestationRejected(
"prompt_template_blake3 must use the `blake3:` prefix".into(),
));
}
let verifying_key_bytes =
decode_lowercase_hex(&envelope.operator_verifying_key_hex).map_err(|detail| {
DecayError::LlmSummaryAttestationRejected(format!(
"operator_verifying_key_hex malformed: {detail}"
))
})?;
if verifying_key_bytes.len() != 32 {
return Err(DecayError::LlmSummaryAttestationRejected(format!(
"operator_verifying_key_hex must decode to 32 bytes; got {}",
verifying_key_bytes.len()
)));
}
let mut key_array = [0u8; 32];
key_array.copy_from_slice(&verifying_key_bytes);
let verifying_key = VerifyingKey::from_bytes(&key_array).map_err(|err| {
DecayError::LlmSummaryAttestationRejected(format!(
"operator_verifying_key_hex did not parse: {err}"
))
})?;
let signature_bytes = decode_lowercase_hex(&envelope.signature_hex).map_err(|detail| {
DecayError::LlmSummaryAttestationRejected(format!("signature_hex malformed: {detail}"))
})?;
if signature_bytes.len() != 64 {
return Err(DecayError::LlmSummaryAttestationRejected(format!(
"signature_hex must decode to 64 bytes; got {}",
signature_bytes.len()
)));
}
let mut sig_array = [0u8; 64];
sig_array.copy_from_slice(&signature_bytes);
let signature = Signature::from_bytes(&sig_array);
let signing_input = canonical_signing_input(envelope);
verifying_key
.verify(&signing_input, &signature)
.map_err(|_| {
DecayError::LlmSummaryAttestationRejected(
"Ed25519 signature did not verify under the declared operator key".into(),
)
})?;
Ok(())
}
pub fn canonical_signing_input(env: &LlmSummaryOperatorAttestationEnvelope) -> Vec<u8> {
const DOMAIN_TAG_LLM_SUMMARY: u8 = 0x21;
let mut out = Vec::new();
out.push(DOMAIN_TAG_LLM_SUMMARY);
out.extend_from_slice(&env.schema_version.to_be_bytes());
push_lp(&mut out, env.purpose.as_bytes());
push_lp(&mut out, env.operator_key_id.as_bytes());
push_lp(&mut out, env.signed_at.as_bytes());
push_lp(&mut out, env.decay_job_id.as_bytes());
push_lp(&mut out, env.model_name.as_bytes());
push_lp(&mut out, env.prompt_template_blake3.as_bytes());
out
}
fn push_lp(out: &mut Vec<u8>, bytes: &[u8]) {
out.extend_from_slice(&(bytes.len() as u64).to_be_bytes());
out.extend_from_slice(bytes);
}
fn decode_lowercase_hex(input: &str) -> Result<Vec<u8>, String> {
if input.len() % 2 != 0 {
return Err(format!("odd hex length {}", input.len()));
}
let mut out = Vec::with_capacity(input.len() / 2);
let bytes = input.as_bytes();
let mut i = 0;
while i < bytes.len() {
let hi = hex_nibble(bytes[i]).ok_or_else(|| format!("invalid hex byte at offset {i}"))?;
let lo = hex_nibble(bytes[i + 1])
.ok_or_else(|| format!("invalid hex byte at offset {}", i + 1))?;
out.push((hi << 4) | lo);
i += 2;
}
Ok(out)
}
fn hex_nibble(byte: u8) -> Option<u8> {
match byte {
b'0'..=b'9' => Some(byte - b'0'),
b'a'..=b'f' => Some(byte - b'a' + 10),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use cortex_core::DecayJobId;
use cortex_llm::NoopSummaryBackend;
use cortex_store::migrate::apply_pending;
use cortex_store::repo::DecayJobRecord;
use ed25519_dalek::{Signer, SigningKey};
use rusqlite::Connection;
use serde_json::json;
fn open_pool() -> Pool {
let pool = Connection::open_in_memory().expect("open in-memory pool");
apply_pending(&pool).expect("apply migrations");
pool
}
fn sample_llm_job(id: DecayJobId) -> DecayJobRecord {
let now = Utc::now();
DecayJobRecord {
id,
kind_wire: "candidate_compression".into(),
summary_method_wire: "llm_summary".into(),
source_ids_json: json!(["mem_01ARZ3NDEKTSV4RRFFQ69G5FAV"]),
state_wire: "in_progress".into(),
state_reason: None,
result_memory_id: None,
scheduled_for: now,
created_at: now,
created_by: "operator:test".into(),
updated_at: now,
}
}
fn lowercase_hex(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
fn signed_envelope(
job_id: &DecayJobId,
model: &str,
prompt_digest: &str,
signing_key: &SigningKey,
) -> LlmSummaryOperatorAttestationEnvelope {
let signed_at = Utc::now().to_rfc3339();
let envelope = LlmSummaryOperatorAttestationEnvelope {
schema_version: DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION,
purpose: DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE.into(),
operator_verifying_key_hex: lowercase_hex(signing_key.verifying_key().as_bytes()),
operator_key_id: "cortex-operator-tests".into(),
signed_at,
decay_job_id: job_id.to_string(),
model_name: model.into(),
prompt_template_blake3: prompt_digest.into(),
signature_hex: String::new(),
};
let input = canonical_signing_input(&envelope);
let signature = signing_key.sign(&input);
let signature_hex = lowercase_hex(&signature.to_bytes());
LlmSummaryOperatorAttestationEnvelope {
signature_hex,
..envelope
}
}
fn envelope_as_json(env: &LlmSummaryOperatorAttestationEnvelope) -> serde_json::Value {
serde_json::json!({
"schema_version": env.schema_version,
"purpose": env.purpose,
"operator_verifying_key_hex": env.operator_verifying_key_hex,
"operator_key_id": env.operator_key_id,
"signed_at": env.signed_at,
"decay_job_id": env.decay_job_id,
"model_name": env.model_name,
"prompt_template_blake3": env.prompt_template_blake3,
"signature_hex": env.signature_hex,
})
}
#[test]
fn llm_summary_refuses_without_operator_attestation() {
let pool = open_pool();
let id = DecayJobId::new();
let job = sample_llm_job(id);
let backend = NoopSummaryBackend;
let err = run_llm_summary_job(&pool, &job, None, &backend).expect_err("must refuse");
assert!(
matches!(err, DecayError::LlmSummaryRequiresOperatorAttestation),
"got {err:?}"
);
assert_eq!(
err.invariant(),
Some(super::super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
);
}
#[test]
fn llm_summary_refuses_on_deterministic_method_record() {
let pool = open_pool();
let id = DecayJobId::new();
let mut job = sample_llm_job(id);
job.summary_method_wire = "deterministic_concatenate".into();
let backend = NoopSummaryBackend;
let err = run_llm_summary_job(&pool, &job, None, &backend).expect_err("must refuse");
assert!(matches!(err, DecayError::Validation(_)), "got {err:?}");
}
#[test]
fn llm_summary_returns_backend_not_configured_with_noop() {
let pool = open_pool();
let source_id = MemoryId::new();
seed_candidate_memory(&pool, &source_id, "alpha");
let id = DecayJobId::new();
let mut job = sample_llm_job(id);
job.source_ids_json = json!([source_id.to_string()]);
let dir = std::env::temp_dir().join(format!("cortex-decay-test-{}", id.as_ulid()));
std::fs::create_dir_all(&dir).unwrap();
let env_path = dir.join("attestation.json");
let signing_key = SigningKey::from_bytes(&[7u8; 32]);
let envelope = signed_envelope(
&id,
"claude-sonnet-4-7@1",
"blake3:0000000000000000000000000000000000000000000000000000000000000000",
&signing_key,
);
std::fs::write(
&env_path,
serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
)
.unwrap();
let backend = NoopSummaryBackend;
let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
.expect_err("noop backend must refuse");
match err {
DecayError::LlmSummaryBackendCallFailed(detail) => {
assert!(
detail.contains("summary_backend_not_configured"),
"detail: {detail}"
);
}
other => panic!("expected LlmSummaryBackendCallFailed, got {other:?}"),
}
}
fn seed_candidate_memory(pool: &Pool, id: &MemoryId, claim: &str) {
let candidate = MemoryCandidate {
id: *id,
memory_type: "semantic".into(),
claim: claim.into(),
source_episodes_json: Value::Array(Vec::new()),
source_events_json: Value::Array(vec![Value::String(
"evt_01ARZ3NDEKTSV4RRFFQ69G5FAV".into(),
)]),
domains_json: Value::Array(vec![Value::String("t".into())]),
salience_json: Value::Object(serde_json::Map::new()),
confidence: 0.7,
authority: "candidate".into(),
applies_when_json: Value::Object(serde_json::Map::new()),
does_not_apply_when_json: Value::Array(Vec::new()),
created_at: Utc::now(),
updated_at: Utc::now(),
};
MemoryRepo::new(pool).insert_candidate(&candidate).unwrap();
}
#[test]
fn llm_summary_rejects_envelope_for_wrong_job() {
let pool = open_pool();
let id = DecayJobId::new();
let other = DecayJobId::new();
let job = sample_llm_job(id);
let dir =
std::env::temp_dir().join(format!("cortex-decay-test-wrong-job-{}", id.as_ulid()));
std::fs::create_dir_all(&dir).unwrap();
let env_path = dir.join("attestation.json");
let signing_key = SigningKey::from_bytes(&[3u8; 32]);
let envelope = signed_envelope(
&other,
"claude-sonnet-4-7@1",
"blake3:1111111111111111111111111111111111111111111111111111111111111111",
&signing_key,
);
std::fs::write(
&env_path,
serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
)
.unwrap();
let backend = NoopSummaryBackend;
let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
.expect_err("envelope mismatch must refuse");
assert!(
matches!(err, DecayError::LlmSummaryAttestationRejected(_)),
"got {err:?}"
);
assert_eq!(
err.invariant(),
Some(super::super::DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
);
}
#[test]
fn llm_summary_rejects_tampered_signature() {
let pool = open_pool();
let id = DecayJobId::new();
let job = sample_llm_job(id);
let dir = std::env::temp_dir().join(format!("cortex-decay-test-tampered-{}", id.as_ulid()));
std::fs::create_dir_all(&dir).unwrap();
let env_path = dir.join("attestation.json");
let signing_key = SigningKey::from_bytes(&[1u8; 32]);
let mut envelope = signed_envelope(
&id,
"claude-sonnet-4-7@1",
"blake3:2222222222222222222222222222222222222222222222222222222222222222",
&signing_key,
);
envelope.signature_hex.replace_range(0..2, "ff");
std::fs::write(
&env_path,
serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
)
.unwrap();
let backend = NoopSummaryBackend;
let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
.expect_err("tampered signature must refuse");
match err {
DecayError::LlmSummaryAttestationRejected(detail) => {
assert!(
detail.contains("signature") || detail.contains("did not verify"),
"detail: {detail}"
);
}
other => panic!("expected LlmSummaryAttestationRejected, got {other:?}"),
}
}
#[test]
fn llm_summary_rejects_malformed_envelope_json() {
let pool = open_pool();
let id = DecayJobId::new();
let job = sample_llm_job(id);
let dir = std::env::temp_dir().join(format!("cortex-decay-test-bad-json-{}", id.as_ulid()));
std::fs::create_dir_all(&dir).unwrap();
let env_path = dir.join("attestation.json");
std::fs::write(&env_path, "{ this is not json").unwrap();
let backend = NoopSummaryBackend;
let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
.expect_err("malformed JSON must refuse");
assert!(matches!(err, DecayError::LlmSummaryAttestationRejected(_)));
}
}