Skip to main content

cortex_memory/decay/
summary.rs

1//! Phase 4.D LLM-summary execution path.
2//!
3//! ## Posture today
4//!
5//! `run_llm_summary_job` is **wired end-to-end**:
6//!
7//! 1. If `operator_attestation` is `None`, the call refuses with
8//!    [`DecayError::LlmSummaryRequiresOperatorAttestation`] under the
9//!    stable invariant
10//!    [`super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT`].
11//!    LLM summarisation is **operator-fired only** — the schema-migration
12//!    boundary is the precedent: a destructive / non-deterministic action
13//!    cannot be unattended.
14//!
15//! 2. If `operator_attestation` is `Some(path)`, the envelope at that path
16//!    is parsed and structurally validated (Ed25519 signature over the
17//!    canonical bytes, schema_version, purpose discriminator). Any
18//!    validation failure refuses with
19//!    [`DecayError::LlmSummaryAttestationRejected`] under the stable
20//!    invariant
21//!    [`super::DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT`].
22//!
23//! 3. With the envelope verified, the function loads source memories /
24//!    episodes from the store, builds a [`cortex_llm::SummaryRequest`]
25//!    carrying the operator-pinned `model_name` and
26//!    `prompt_template_blake3`, and calls
27//!    [`cortex_llm::SummaryBackend::summarize`]. Any backend refusal
28//!    (allowlist mismatch, prompt template mismatch, upstream failure,
29//!    output validation failure, or the noop default backend) surfaces
30//!    as [`DecayError::LlmSummaryBackendCallFailed`] under the stable
31//!    invariant
32//!    [`super::DECAY_LLM_SUMMARY_BACKEND_CALL_FAILED_INVARIANT`].
33//!
34//! 4. On a successful backend call, the function validates that the
35//!    backend's `model_name_echoed` byte-equals the envelope's
36//!    `model_name` (so a silently-routed provider cannot launder the
37//!    attestation pin) and that the produced claim is non-empty and
38//!    within the deterministic byte budget. It then composes a
39//!    candidate-tier [`MemoryCandidate`] under the same pessimistic-merge
40//!    posture as the deterministic compressor
41//!    ([`super::compress`]): `confidence = min(source confidences)`,
42//!    `authority` = lowest-trust authority among sources, provenance =
43//!    deduplicated union of source provenance arrays. The candidate is
44//!    persisted with [`MemoryRepo::insert_candidate`] and supersession
45//!    edges are recorded for every source via
46//!    [`DecayJobRepo::record_memory_supersession`] (for memory sources)
47//!    or [`DecayJobRepo::record_episode_supersession`] (for episode
48//!    sources).
49//!
50//! ## CLI default posture
51//!
52//! The CLI surface (`cortex decay run --operator-attestation <PATH>`)
53//! injects a [`cortex_llm::NoopSummaryBackend`] today. That means a
54//! production LLM-summary run will get all the way through envelope
55//! verification and then refuse with
56//! [`DecayError::LlmSummaryBackendCallFailed`] (reason
57//! `summary_backend_not_configured`). Operators who want to actually
58//! produce summaries inject a hosted backend programmatically, or pass
59//! a [`cortex_llm::ReplaySummaryBackend`] fixture in CI.
60//!
61//! ## Doctrine note
62//!
63//! **An LLM summary is candidate-tier evidence only.** It is NEVER
64//! directly promoted to principle: the standard
65//! `cortex_memory::lifecycle::accept_candidate` ceremony still applies
66//! (proof closure, contradiction scan, semantic trust, operator temporal
67//! authority). The LLM call itself is gated by operator attestation, and
68//! the per-call attestation binds the operator's signing key to:
69//!
70//! - the **model name** (so a captured envelope cannot authorise a
71//!   different model);
72//! - the **source ids** being compressed — bound via the decay job id, so
73//!   a captured envelope cannot be replayed against a different source
74//!   set;
75//! - the **prompt template digest** (so a captured envelope cannot
76//!   silently swap prompts).
77
78use std::collections::BTreeSet;
79use std::path::Path;
80
81use chrono::Utc;
82use cortex_core::{DecayJobId, EpisodeId, MemoryId};
83use cortex_llm::{SummaryBackend, SummaryError, SummaryRequest, SummaryResponse};
84use cortex_store::repo::{
85    DecayJobRecord, DecayJobRepo, EpisodeRecord, EpisodeRepo, MemoryCandidate, MemoryRecord,
86    MemoryRepo,
87};
88use cortex_store::Pool;
89use ed25519_dalek::{Signature, Verifier, VerifyingKey};
90use serde::Deserialize;
91use serde_json::Value;
92
93use super::{
94    DecayError, DecayJobKind, DecayResult, SummaryMethod, DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE,
95    DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION, DECAY_SUMMARY_MAX_CLAIM_BYTES,
96};
97
98/// On-disk envelope for an operator attestation authorising an LLM summary
99/// job. Structurally analogous to the migration-attestation envelope: the
100/// Ed25519 signature is computed over a domain-tagged length-prefixed
101/// binary encoding of the non-signature fields.
102#[derive(Debug, Clone, Deserialize)]
103pub struct LlmSummaryOperatorAttestationEnvelope {
104    /// Envelope schema version. Must equal
105    /// [`super::DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION`].
106    pub schema_version: u16,
107    /// Purpose discriminator. Must equal
108    /// [`super::DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE`].
109    pub purpose: String,
110    /// Hex-encoded Ed25519 verifying key (32 bytes, lowercase, no separator).
111    pub operator_verifying_key_hex: String,
112    /// Free-form operator key id (matches `cortex-identity`).
113    pub operator_key_id: String,
114    /// RFC3339 timestamp the envelope was signed at.
115    pub signed_at: String,
116    /// Decay job id this attestation authorises. Binds the envelope to a
117    /// single job so a captured attestation cannot be replayed against a
118    /// different job.
119    pub decay_job_id: String,
120    /// Pinned LLM model name (e.g. `claude-sonnet-4-7@1`). The future
121    /// backend MUST refuse if the resolved model name differs.
122    pub model_name: String,
123    /// Blake3 digest of the prompt template that will be sent to the LLM.
124    /// Stored as `blake3:<hex>` for forward compatibility with other hash
125    /// families.
126    pub prompt_template_blake3: String,
127    /// Hex-encoded Ed25519 signature (64 bytes, lowercase).
128    pub signature_hex: String,
129}
130
131/// Source kind for an LLM-summary decay job. Determined by the job's
132/// `kind_wire` discriminator and used to drive the per-source lookup and
133/// the per-source supersession recorder call.
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135enum LlmSummarySourceKind {
136    /// Sources are candidate memories (job's
137    /// `kind_wire == "candidate_compression"`).
138    Memory,
139    /// Sources are episodes (job's `kind_wire == "episode_compression"`).
140    Episode,
141}
142
143/// Run an LLM-summary decay job.
144///
145/// The function:
146///
147/// 1. validates that the supplied [`DecayJobRecord`] carries an
148///    LLM-summary method (a deterministic-method job would be a contract
149///    violation here — the runner dispatches those to `compress.rs`);
150/// 2. refuses without an `operator_attestation` path
151///    ([`DecayError::LlmSummaryRequiresOperatorAttestation`]);
152/// 3. parses the envelope, verifies its Ed25519 signature against the
153///    declared verifying key, and refuses on any structural / signature
154///    mismatch
155///    ([`DecayError::LlmSummaryAttestationRejected`]);
156/// 4. loads source memories or episodes, calls
157///    [`SummaryBackend::summarize`] under the operator-pinned
158///    `model_name` and `prompt_template_blake3`, and refuses on any
159///    backend error or output validation failure
160///    ([`DecayError::LlmSummaryBackendCallFailed`]);
161/// 5. composes a candidate-tier summary memory under the same pessimistic
162///    merge posture as the deterministic compressor, persists it via
163///    [`MemoryRepo::insert_candidate`], and records supersession edges
164///    for every source.
165///
166/// Returns the produced summary memory's id on success.
167pub fn run_llm_summary_job(
168    pool: &Pool,
169    job: &DecayJobRecord,
170    operator_attestation: Option<&Path>,
171    summary_backend: &dyn SummaryBackend,
172) -> DecayResult<MemoryId> {
173    // Cross-check that this record really carries an LLM summary method.
174    // The runner only dispatches LLM jobs here, but the typed boundary
175    // catches a future refactor that forgets to filter.
176    let expected_wire = SummaryMethod::LlmSummary {
177        operator_attestation_required: true,
178    }
179    .method_wire();
180    if job.summary_method_wire != expected_wire {
181        return Err(DecayError::Validation(format!(
182            "run_llm_summary_job invoked on a job whose summary_method_wire is `{}` (expected `{expected_wire}`)",
183            job.summary_method_wire,
184        )));
185    }
186
187    let path = operator_attestation.ok_or(DecayError::LlmSummaryRequiresOperatorAttestation)?;
188    let envelope = load_envelope(path)?;
189    verify_envelope_for_job(&envelope, &job.id)?;
190
191    let source_kind = source_kind_from_job(job)?;
192    match source_kind {
193        LlmSummarySourceKind::Memory => {
194            let source_ids = parse_memory_source_ids(job)?;
195            let sources = load_memory_sources(pool, &source_ids)?;
196            let (request, claims) = build_request_from_memory_sources(&envelope, job, &sources);
197            let response = call_backend(summary_backend, &request)?;
198            validate_response(&response, &envelope, &claims)?;
199            let candidate = build_candidate_from_memory_sources(&sources, &source_ids, &response);
200            persist_summary_memory(pool, &candidate, job, &sources, &[])
201        }
202        LlmSummarySourceKind::Episode => {
203            let source_ids = parse_episode_source_ids(job)?;
204            let sources = load_episode_sources(pool, &source_ids)?;
205            let (request, claims) = build_request_from_episode_sources(&envelope, job, &sources);
206            let response = call_backend(summary_backend, &request)?;
207            validate_response(&response, &envelope, &claims)?;
208            let candidate = build_candidate_from_episode_sources(&sources, &source_ids, &response);
209            persist_summary_memory(pool, &candidate, job, &[], &sources)
210        }
211    }
212}
213
214/// Run an LLM-summary decay job for a typed [`DecayJobKind`]. Convenience
215/// wrapper used by callers that already hold a typed [`super::DecayJob`].
216pub fn run_llm_summary_job_typed(
217    pool: &Pool,
218    job: &DecayJobRecord,
219    kind: &DecayJobKind,
220    operator_attestation: Option<&Path>,
221    summary_backend: &dyn SummaryBackend,
222) -> DecayResult<MemoryId> {
223    match kind.summary_method() {
224        Some(SummaryMethod::LlmSummary { .. }) => {
225            run_llm_summary_job(pool, job, operator_attestation, summary_backend)
226        }
227        Some(SummaryMethod::DeterministicConcatenate) => Err(DecayError::Validation(
228            "run_llm_summary_job_typed invoked on a deterministic-concatenate job".into(),
229        )),
230        None => Err(DecayError::Validation(
231            "run_llm_summary_job_typed invoked on a kind that carries no summary method".into(),
232        )),
233    }
234}
235
236fn source_kind_from_job(job: &DecayJobRecord) -> DecayResult<LlmSummarySourceKind> {
237    match job.kind_wire.as_str() {
238        "candidate_compression" => Ok(LlmSummarySourceKind::Memory),
239        "episode_compression" => Ok(LlmSummarySourceKind::Episode),
240        other => Err(DecayError::Validation(format!(
241            "run_llm_summary_job invoked on a job whose kind_wire is `{other}` (expected `candidate_compression` or `episode_compression`)",
242        ))),
243    }
244}
245
246fn parse_memory_source_ids(job: &DecayJobRecord) -> DecayResult<Vec<MemoryId>> {
247    let array = job.source_ids_json.as_array().ok_or_else(|| {
248        DecayError::Validation("run_llm_summary_job: source_ids_json must be a JSON array".into())
249    })?;
250    let mut out = Vec::with_capacity(array.len());
251    for value in array {
252        let raw = value.as_str().ok_or_else(|| {
253            DecayError::Validation(
254                "run_llm_summary_job: source_ids_json entries must be strings".into(),
255            )
256        })?;
257        let id = raw.parse::<MemoryId>().map_err(|err| {
258            DecayError::Validation(format!(
259                "run_llm_summary_job: source id `{raw}` is not a memory id: {err}",
260            ))
261        })?;
262        out.push(id);
263    }
264    if out.is_empty() {
265        return Err(DecayError::Validation(
266            "run_llm_summary_job: source_ids_json must contain at least one id".into(),
267        ));
268    }
269    Ok(out)
270}
271
272fn parse_episode_source_ids(job: &DecayJobRecord) -> DecayResult<Vec<EpisodeId>> {
273    let array = job.source_ids_json.as_array().ok_or_else(|| {
274        DecayError::Validation("run_llm_summary_job: source_ids_json must be a JSON array".into())
275    })?;
276    let mut out = Vec::with_capacity(array.len());
277    for value in array {
278        let raw = value.as_str().ok_or_else(|| {
279            DecayError::Validation(
280                "run_llm_summary_job: source_ids_json entries must be strings".into(),
281            )
282        })?;
283        let id = raw.parse::<EpisodeId>().map_err(|err| {
284            DecayError::Validation(format!(
285                "run_llm_summary_job: source id `{raw}` is not an episode id: {err}",
286            ))
287        })?;
288        out.push(id);
289    }
290    if out.is_empty() {
291        return Err(DecayError::Validation(
292            "run_llm_summary_job: source_ids_json must contain at least one id".into(),
293        ));
294    }
295    Ok(out)
296}
297
298fn load_memory_sources(pool: &Pool, ids: &[MemoryId]) -> DecayResult<Vec<MemoryRecord>> {
299    let repo = MemoryRepo::new(pool);
300    let mut out = Vec::with_capacity(ids.len());
301    for id in ids {
302        match repo.get_by_id(id)? {
303            Some(record) => out.push(record),
304            None => {
305                return Err(DecayError::Validation(format!(
306                    "{}: memory {id} not found",
307                    super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT
308                )));
309            }
310        }
311    }
312    Ok(out)
313}
314
315fn load_episode_sources(pool: &Pool, ids: &[EpisodeId]) -> DecayResult<Vec<EpisodeRecord>> {
316    let repo = EpisodeRepo::new(pool);
317    let mut out = Vec::with_capacity(ids.len());
318    for id in ids {
319        match repo.get_by_id(id)? {
320            Some(record) => out.push(record),
321            None => {
322                return Err(DecayError::Validation(format!(
323                    "{}: episode {id} not found",
324                    super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT
325                )));
326            }
327        }
328    }
329    Ok(out)
330}
331
332fn build_request_from_memory_sources(
333    envelope: &LlmSummaryOperatorAttestationEnvelope,
334    job: &DecayJobRecord,
335    sources: &[MemoryRecord],
336) -> (SummaryRequest, Vec<String>) {
337    let claims: Vec<String> = sources.iter().map(|m| m.claim.clone()).collect();
338    let request = SummaryRequest {
339        model_name: envelope.model_name.clone(),
340        prompt_template_blake3: envelope.prompt_template_blake3.clone(),
341        source_claims: claims.clone(),
342        max_output_bytes: Some(DECAY_SUMMARY_MAX_CLAIM_BYTES),
343        decay_job_id: Some(job.id.to_string()),
344    };
345    (request, claims)
346}
347
348fn build_request_from_episode_sources(
349    envelope: &LlmSummaryOperatorAttestationEnvelope,
350    job: &DecayJobRecord,
351    sources: &[EpisodeRecord],
352) -> (SummaryRequest, Vec<String>) {
353    let claims: Vec<String> = sources.iter().map(|e| e.summary.clone()).collect();
354    let request = SummaryRequest {
355        model_name: envelope.model_name.clone(),
356        prompt_template_blake3: envelope.prompt_template_blake3.clone(),
357        source_claims: claims.clone(),
358        max_output_bytes: Some(DECAY_SUMMARY_MAX_CLAIM_BYTES),
359        decay_job_id: Some(job.id.to_string()),
360    };
361    (request, claims)
362}
363
364fn call_backend(
365    backend: &dyn SummaryBackend,
366    request: &SummaryRequest,
367) -> DecayResult<SummaryResponse> {
368    backend
369        .summarize(request)
370        .map_err(|err| DecayError::LlmSummaryBackendCallFailed(format_backend_error(err)))
371}
372
373fn format_backend_error(err: SummaryError) -> String {
374    // The Display impl on SummaryError already prepends the discriminator
375    // token (e.g. `summary_backend_not_configured`), so the rendered
376    // message is grep-friendly verbatim.
377    err.to_string()
378}
379
380fn validate_response(
381    response: &SummaryResponse,
382    envelope: &LlmSummaryOperatorAttestationEnvelope,
383    source_claims: &[String],
384) -> DecayResult<()> {
385    if response.model_name_echoed != envelope.model_name {
386        return Err(DecayError::LlmSummaryBackendCallFailed(format!(
387            "model mismatch: backend echoed model_name=`{}` but operator attestation pinned model_name=`{}`",
388            response.model_name_echoed, envelope.model_name,
389        )));
390    }
391    if response.claim.trim().is_empty() {
392        return Err(DecayError::LlmSummaryBackendCallFailed(
393            "backend produced an empty summary claim".into(),
394        ));
395    }
396    if response.claim.len() > DECAY_SUMMARY_MAX_CLAIM_BYTES {
397        return Err(DecayError::LlmSummaryBackendCallFailed(format!(
398            "backend produced a summary claim of {} bytes (limit {DECAY_SUMMARY_MAX_CLAIM_BYTES})",
399            response.claim.len(),
400        )));
401    }
402    if source_claims.is_empty() {
403        return Err(DecayError::Validation(
404            "run_llm_summary_job: source_claims must not be empty after loading sources".into(),
405        ));
406    }
407    Ok(())
408}
409
410fn build_candidate_from_memory_sources(
411    sources: &[MemoryRecord],
412    source_ids: &[MemoryId],
413    response: &SummaryResponse,
414) -> MemoryCandidate {
415    // Mirrors the deterministic compressor's pessimistic-merge posture
416    // (see `super::compress::build_memory_summary`). The only material
417    // difference is the `claim`: instead of the deterministic
418    // concatenation we use the backend's produced summary text.
419    let confidence = pessimistic_min_confidence(sources.iter().map(|m| m.confidence));
420    let authority = lowest_authority_label(sources.iter().map(|m| m.authority.as_str()));
421
422    let domains = union_json_strings(sources.iter().map(|m| &m.domains_json));
423    let source_events = union_json_strings(sources.iter().map(|m| &m.source_events_json));
424    let source_episodes = union_json_strings(sources.iter().map(|m| &m.source_episodes_json));
425
426    // The store enforces "at least one of source_episodes / source_events
427    // is non-empty"; if both union to empty we fall back to encoding the
428    // source memory ids into source_episodes_json (last-resort lineage
429    // anchor) — same posture as the deterministic compressor.
430    let source_episodes =
431        if json_array_is_empty(&source_episodes) && json_array_is_empty(&source_events) {
432            Value::Array(
433                source_ids
434                    .iter()
435                    .map(|id| Value::String(id.to_string()))
436                    .collect(),
437            )
438        } else {
439            source_episodes
440        };
441
442    let applies_when = serde_json::json!({
443        "summary_of_memories": source_ids
444            .iter()
445            .map(ToString::to_string)
446            .collect::<Vec<_>>(),
447        "llm_summary": {
448            "model_name": response.model_name_echoed,
449        }
450    });
451
452    let now = Utc::now();
453    MemoryCandidate {
454        id: MemoryId::new(),
455        memory_type: "summary".into(),
456        claim: response.claim.clone(),
457        source_episodes_json: source_episodes,
458        source_events_json: source_events,
459        domains_json: domains,
460        salience_json: Value::Object(serde_json::Map::new()),
461        confidence,
462        authority,
463        applies_when_json: applies_when,
464        does_not_apply_when_json: Value::Array(Vec::new()),
465        created_at: now,
466        updated_at: now,
467    }
468}
469
470fn build_candidate_from_episode_sources(
471    sources: &[EpisodeRecord],
472    source_ids: &[EpisodeId],
473    response: &SummaryResponse,
474) -> MemoryCandidate {
475    let confidence = pessimistic_min_confidence(sources.iter().map(|e| e.confidence));
476    // Episodes do not carry an explicit authority tier; fold to "derived"
477    // conservatively (lowest trust), matching the deterministic episode
478    // compressor.
479    let authority = "derived".to_string();
480
481    let domains = union_json_strings(sources.iter().map(|e| &e.domains_json));
482    let source_events = union_json_strings(sources.iter().map(|e| &e.source_events_json));
483    let source_episodes = Value::Array(
484        source_ids
485            .iter()
486            .map(|id| Value::String(id.to_string()))
487            .collect(),
488    );
489
490    let applies_when = serde_json::json!({
491        "summary_of_episodes": source_ids
492            .iter()
493            .map(ToString::to_string)
494            .collect::<Vec<_>>(),
495        "llm_summary": {
496            "model_name": response.model_name_echoed,
497        }
498    });
499
500    let now = Utc::now();
501    MemoryCandidate {
502        id: MemoryId::new(),
503        memory_type: "summary".into(),
504        claim: response.claim.clone(),
505        source_episodes_json: source_episodes,
506        source_events_json: source_events,
507        domains_json: domains,
508        salience_json: Value::Object(serde_json::Map::new()),
509        confidence,
510        authority,
511        applies_when_json: applies_when,
512        does_not_apply_when_json: Value::Array(Vec::new()),
513        created_at: now,
514        updated_at: now,
515    }
516}
517
518fn persist_summary_memory(
519    pool: &Pool,
520    candidate: &MemoryCandidate,
521    job: &DecayJobRecord,
522    memory_sources: &[MemoryRecord],
523    episode_sources: &[EpisodeRecord],
524) -> DecayResult<MemoryId> {
525    let memory_repo = MemoryRepo::new(pool);
526    memory_repo.insert_candidate(candidate)?;
527    let summary_id = candidate.id;
528
529    let job_repo = DecayJobRepo::new(pool);
530    let now = Utc::now();
531    for source in memory_sources {
532        job_repo.record_memory_supersession(&source.id, &summary_id, Some(&job.id), now)?;
533    }
534    for source in episode_sources {
535        job_repo.record_episode_supersession(&source.id, &summary_id, Some(&job.id), now)?;
536    }
537    Ok(summary_id)
538}
539
540fn pessimistic_min_confidence<I: IntoIterator<Item = f64>>(values: I) -> f64 {
541    values
542        .into_iter()
543        .fold(f64::INFINITY, |acc, v| acc.min(v))
544        .clamp(0.0, 1.0)
545}
546
547fn lowest_authority_label<'a, I: IntoIterator<Item = &'a str>>(labels: I) -> String {
548    // Mirror compress.rs trust ladder: derived < candidate < agent < user.
549    #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
550    enum Tier {
551        Derived,
552        Candidate,
553        Agent,
554        User,
555    }
556    fn parse(label: &str) -> Tier {
557        match label {
558            "user" | "User" => Tier::User,
559            "agent" | "Agent" => Tier::Agent,
560            "candidate" | "Candidate" => Tier::Candidate,
561            _ => Tier::Derived,
562        }
563    }
564    let mut min_tier = Tier::User;
565    let mut min_label: Option<String> = None;
566    for label in labels {
567        let tier = parse(label);
568        if tier <= min_tier {
569            min_tier = tier;
570            min_label = Some(label.to_string());
571        }
572    }
573    min_label.unwrap_or_else(|| match min_tier {
574        Tier::Derived => "derived".into(),
575        Tier::Candidate => "candidate".into(),
576        Tier::Agent => "agent".into(),
577        Tier::User => "user".into(),
578    })
579}
580
581fn union_json_strings<'a, I: IntoIterator<Item = &'a Value>>(arrays: I) -> Value {
582    let mut seen: BTreeSet<String> = BTreeSet::new();
583    let mut ordered: Vec<Value> = Vec::new();
584    for value in arrays {
585        match value {
586            Value::Array(items) => {
587                for item in items {
588                    let key = canonical_key(item);
589                    if seen.insert(key) {
590                        ordered.push(item.clone());
591                    }
592                }
593            }
594            Value::String(s) => {
595                let v = Value::String(s.clone());
596                let key = canonical_key(&v);
597                if seen.insert(key) {
598                    ordered.push(v);
599                }
600            }
601            _ => {}
602        }
603    }
604    Value::Array(ordered)
605}
606
607fn json_array_is_empty(value: &Value) -> bool {
608    match value {
609        Value::Array(a) => a.is_empty(),
610        _ => true,
611    }
612}
613
614fn canonical_key(value: &Value) -> String {
615    serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
616}
617
618fn load_envelope(path: &Path) -> DecayResult<LlmSummaryOperatorAttestationEnvelope> {
619    if !path.is_file() {
620        return Err(DecayError::LlmSummaryAttestationRejected(format!(
621            "envelope `{}` not found",
622            path.display()
623        )));
624    }
625    let raw = std::fs::read_to_string(path).map_err(|err| {
626        DecayError::LlmSummaryAttestationRejected(format!(
627            "envelope `{}` could not be read: {err}",
628            path.display()
629        ))
630    })?;
631    serde_json::from_str(&raw).map_err(|err| {
632        DecayError::LlmSummaryAttestationRejected(format!(
633            "envelope `{}` is not valid JSON: {err}",
634            path.display()
635        ))
636    })
637}
638
639fn verify_envelope_for_job(
640    envelope: &LlmSummaryOperatorAttestationEnvelope,
641    expected_job_id: &DecayJobId,
642) -> DecayResult<()> {
643    if envelope.schema_version != DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION {
644        return Err(DecayError::LlmSummaryAttestationRejected(format!(
645            "schema_version {} (expected {})",
646            envelope.schema_version, DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION,
647        )));
648    }
649    if envelope.purpose != DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE {
650        return Err(DecayError::LlmSummaryAttestationRejected(format!(
651            "purpose `{}` (expected `{}`)",
652            envelope.purpose, DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE,
653        )));
654    }
655    if envelope.decay_job_id != expected_job_id.to_string() {
656        return Err(DecayError::LlmSummaryAttestationRejected(format!(
657            "decay_job_id `{}` does not match job `{}`",
658            envelope.decay_job_id, expected_job_id,
659        )));
660    }
661    if envelope.model_name.trim().is_empty() {
662        return Err(DecayError::LlmSummaryAttestationRejected(
663            "model_name must be non-empty".into(),
664        ));
665    }
666    if !envelope.prompt_template_blake3.starts_with("blake3:") {
667        return Err(DecayError::LlmSummaryAttestationRejected(
668            "prompt_template_blake3 must use the `blake3:` prefix".into(),
669        ));
670    }
671
672    let verifying_key_bytes =
673        decode_lowercase_hex(&envelope.operator_verifying_key_hex).map_err(|detail| {
674            DecayError::LlmSummaryAttestationRejected(format!(
675                "operator_verifying_key_hex malformed: {detail}"
676            ))
677        })?;
678    if verifying_key_bytes.len() != 32 {
679        return Err(DecayError::LlmSummaryAttestationRejected(format!(
680            "operator_verifying_key_hex must decode to 32 bytes; got {}",
681            verifying_key_bytes.len()
682        )));
683    }
684    let mut key_array = [0u8; 32];
685    key_array.copy_from_slice(&verifying_key_bytes);
686    let verifying_key = VerifyingKey::from_bytes(&key_array).map_err(|err| {
687        DecayError::LlmSummaryAttestationRejected(format!(
688            "operator_verifying_key_hex did not parse: {err}"
689        ))
690    })?;
691
692    let signature_bytes = decode_lowercase_hex(&envelope.signature_hex).map_err(|detail| {
693        DecayError::LlmSummaryAttestationRejected(format!("signature_hex malformed: {detail}"))
694    })?;
695    if signature_bytes.len() != 64 {
696        return Err(DecayError::LlmSummaryAttestationRejected(format!(
697            "signature_hex must decode to 64 bytes; got {}",
698            signature_bytes.len()
699        )));
700    }
701    let mut sig_array = [0u8; 64];
702    sig_array.copy_from_slice(&signature_bytes);
703    let signature = Signature::from_bytes(&sig_array);
704
705    let signing_input = canonical_signing_input(envelope);
706    verifying_key
707        .verify(&signing_input, &signature)
708        .map_err(|_| {
709            DecayError::LlmSummaryAttestationRejected(
710                "Ed25519 signature did not verify under the declared operator key".into(),
711            )
712        })?;
713    Ok(())
714}
715
716/// Canonical signing input for the LLM-summary operator attestation
717/// envelope. Length-prefixed big-endian framing, fixed field order, with a
718/// 1-byte domain tag that is structurally disjoint from the
719/// migration-attestation domain. The LLM-summary domain tag is `0x21`.
720///
721/// Exposed publicly so out-of-crate test fixtures (and any operator-side
722/// signing tool) can produce envelopes whose signatures verify under this
723/// surface without re-deriving the wire shape.
724pub fn canonical_signing_input(env: &LlmSummaryOperatorAttestationEnvelope) -> Vec<u8> {
725    const DOMAIN_TAG_LLM_SUMMARY: u8 = 0x21;
726    let mut out = Vec::new();
727    out.push(DOMAIN_TAG_LLM_SUMMARY);
728    out.extend_from_slice(&env.schema_version.to_be_bytes());
729    push_lp(&mut out, env.purpose.as_bytes());
730    push_lp(&mut out, env.operator_key_id.as_bytes());
731    push_lp(&mut out, env.signed_at.as_bytes());
732    push_lp(&mut out, env.decay_job_id.as_bytes());
733    push_lp(&mut out, env.model_name.as_bytes());
734    push_lp(&mut out, env.prompt_template_blake3.as_bytes());
735    out
736}
737
738fn push_lp(out: &mut Vec<u8>, bytes: &[u8]) {
739    out.extend_from_slice(&(bytes.len() as u64).to_be_bytes());
740    out.extend_from_slice(bytes);
741}
742
743fn decode_lowercase_hex(input: &str) -> Result<Vec<u8>, String> {
744    if input.len() % 2 != 0 {
745        return Err(format!("odd hex length {}", input.len()));
746    }
747    let mut out = Vec::with_capacity(input.len() / 2);
748    let bytes = input.as_bytes();
749    let mut i = 0;
750    while i < bytes.len() {
751        let hi = hex_nibble(bytes[i]).ok_or_else(|| format!("invalid hex byte at offset {i}"))?;
752        let lo = hex_nibble(bytes[i + 1])
753            .ok_or_else(|| format!("invalid hex byte at offset {}", i + 1))?;
754        out.push((hi << 4) | lo);
755        i += 2;
756    }
757    Ok(out)
758}
759
760fn hex_nibble(byte: u8) -> Option<u8> {
761    match byte {
762        b'0'..=b'9' => Some(byte - b'0'),
763        b'a'..=b'f' => Some(byte - b'a' + 10),
764        _ => None,
765    }
766}
767
768#[cfg(test)]
769mod tests {
770    use super::*;
771    use chrono::Utc;
772    use cortex_core::DecayJobId;
773    use cortex_llm::NoopSummaryBackend;
774    use cortex_store::migrate::apply_pending;
775    use cortex_store::repo::DecayJobRecord;
776    use ed25519_dalek::{Signer, SigningKey};
777    use rusqlite::Connection;
778    use serde_json::json;
779
780    fn open_pool() -> Pool {
781        let pool = Connection::open_in_memory().expect("open in-memory pool");
782        apply_pending(&pool).expect("apply migrations");
783        pool
784    }
785
786    fn sample_llm_job(id: DecayJobId) -> DecayJobRecord {
787        let now = Utc::now();
788        DecayJobRecord {
789            id,
790            kind_wire: "candidate_compression".into(),
791            summary_method_wire: "llm_summary".into(),
792            source_ids_json: json!(["mem_01ARZ3NDEKTSV4RRFFQ69G5FAV"]),
793            state_wire: "in_progress".into(),
794            state_reason: None,
795            result_memory_id: None,
796            scheduled_for: now,
797            created_at: now,
798            created_by: "operator:test".into(),
799            updated_at: now,
800        }
801    }
802
803    fn lowercase_hex(bytes: &[u8]) -> String {
804        let mut s = String::with_capacity(bytes.len() * 2);
805        for b in bytes {
806            s.push_str(&format!("{b:02x}"));
807        }
808        s
809    }
810
811    fn signed_envelope(
812        job_id: &DecayJobId,
813        model: &str,
814        prompt_digest: &str,
815        signing_key: &SigningKey,
816    ) -> LlmSummaryOperatorAttestationEnvelope {
817        let signed_at = Utc::now().to_rfc3339();
818        let envelope = LlmSummaryOperatorAttestationEnvelope {
819            schema_version: DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION,
820            purpose: DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE.into(),
821            operator_verifying_key_hex: lowercase_hex(signing_key.verifying_key().as_bytes()),
822            operator_key_id: "cortex-operator-tests".into(),
823            signed_at,
824            decay_job_id: job_id.to_string(),
825            model_name: model.into(),
826            prompt_template_blake3: prompt_digest.into(),
827            signature_hex: String::new(),
828        };
829        let input = canonical_signing_input(&envelope);
830        let signature = signing_key.sign(&input);
831        let signature_hex = lowercase_hex(&signature.to_bytes());
832        LlmSummaryOperatorAttestationEnvelope {
833            signature_hex,
834            ..envelope
835        }
836    }
837
838    fn envelope_as_json(env: &LlmSummaryOperatorAttestationEnvelope) -> serde_json::Value {
839        serde_json::json!({
840            "schema_version": env.schema_version,
841            "purpose": env.purpose,
842            "operator_verifying_key_hex": env.operator_verifying_key_hex,
843            "operator_key_id": env.operator_key_id,
844            "signed_at": env.signed_at,
845            "decay_job_id": env.decay_job_id,
846            "model_name": env.model_name,
847            "prompt_template_blake3": env.prompt_template_blake3,
848            "signature_hex": env.signature_hex,
849        })
850    }
851
852    #[test]
853    fn llm_summary_refuses_without_operator_attestation() {
854        let pool = open_pool();
855        let id = DecayJobId::new();
856        let job = sample_llm_job(id);
857        let backend = NoopSummaryBackend;
858
859        let err = run_llm_summary_job(&pool, &job, None, &backend).expect_err("must refuse");
860        assert!(
861            matches!(err, DecayError::LlmSummaryRequiresOperatorAttestation),
862            "got {err:?}"
863        );
864        assert_eq!(
865            err.invariant(),
866            Some(super::super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
867        );
868    }
869
870    #[test]
871    fn llm_summary_refuses_on_deterministic_method_record() {
872        // A deterministic-concatenate record dispatched here is a contract
873        // violation; the runner must not route to this entry point. The
874        // function refuses fail-closed with a Validation error.
875        let pool = open_pool();
876        let id = DecayJobId::new();
877        let mut job = sample_llm_job(id);
878        job.summary_method_wire = "deterministic_concatenate".into();
879        let backend = NoopSummaryBackend;
880        let err = run_llm_summary_job(&pool, &job, None, &backend).expect_err("must refuse");
881        assert!(matches!(err, DecayError::Validation(_)), "got {err:?}");
882    }
883
884    #[test]
885    fn llm_summary_returns_backend_not_configured_with_noop() {
886        // Previously this test asserted a panic at `unimplemented!()`.
887        // With the SummaryBackend trait wired, the envelope verifies and
888        // the NoopSummaryBackend returns BackendNotConfigured, which the
889        // runner wraps as DecayError::LlmSummaryBackendCallFailed with
890        // the stable invariant.
891        let pool = open_pool();
892        // Seed a real candidate memory so the source lookup succeeds and
893        // the call reaches the backend (rather than failing on a
894        // source-missing validation error first).
895        let source_id = MemoryId::new();
896        seed_candidate_memory(&pool, &source_id, "alpha");
897
898        let id = DecayJobId::new();
899        let mut job = sample_llm_job(id);
900        job.source_ids_json = json!([source_id.to_string()]);
901        let dir = std::env::temp_dir().join(format!("cortex-decay-test-{}", id.as_ulid()));
902        std::fs::create_dir_all(&dir).unwrap();
903        let env_path = dir.join("attestation.json");
904
905        let signing_key = SigningKey::from_bytes(&[7u8; 32]);
906        let envelope = signed_envelope(
907            &id,
908            "claude-sonnet-4-7@1",
909            "blake3:0000000000000000000000000000000000000000000000000000000000000000",
910            &signing_key,
911        );
912        std::fs::write(
913            &env_path,
914            serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
915        )
916        .unwrap();
917
918        let backend = NoopSummaryBackend;
919        let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
920            .expect_err("noop backend must refuse");
921        match err {
922            DecayError::LlmSummaryBackendCallFailed(detail) => {
923                assert!(
924                    detail.contains("summary_backend_not_configured"),
925                    "detail: {detail}"
926                );
927            }
928            other => panic!("expected LlmSummaryBackendCallFailed, got {other:?}"),
929        }
930    }
931
932    fn seed_candidate_memory(pool: &Pool, id: &MemoryId, claim: &str) {
933        let candidate = MemoryCandidate {
934            id: *id,
935            memory_type: "semantic".into(),
936            claim: claim.into(),
937            source_episodes_json: Value::Array(Vec::new()),
938            source_events_json: Value::Array(vec![Value::String(
939                "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV".into(),
940            )]),
941            domains_json: Value::Array(vec![Value::String("t".into())]),
942            salience_json: Value::Object(serde_json::Map::new()),
943            confidence: 0.7,
944            authority: "candidate".into(),
945            applies_when_json: Value::Object(serde_json::Map::new()),
946            does_not_apply_when_json: Value::Array(Vec::new()),
947            created_at: Utc::now(),
948            updated_at: Utc::now(),
949        };
950        MemoryRepo::new(pool).insert_candidate(&candidate).unwrap();
951    }
952
953    #[test]
954    fn llm_summary_rejects_envelope_for_wrong_job() {
955        let pool = open_pool();
956        let id = DecayJobId::new();
957        let other = DecayJobId::new();
958        let job = sample_llm_job(id);
959
960        let dir =
961            std::env::temp_dir().join(format!("cortex-decay-test-wrong-job-{}", id.as_ulid()));
962        std::fs::create_dir_all(&dir).unwrap();
963        let env_path = dir.join("attestation.json");
964
965        let signing_key = SigningKey::from_bytes(&[3u8; 32]);
966        let envelope = signed_envelope(
967            &other,
968            "claude-sonnet-4-7@1",
969            "blake3:1111111111111111111111111111111111111111111111111111111111111111",
970            &signing_key,
971        );
972        std::fs::write(
973            &env_path,
974            serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
975        )
976        .unwrap();
977        let backend = NoopSummaryBackend;
978        let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
979            .expect_err("envelope mismatch must refuse");
980        assert!(
981            matches!(err, DecayError::LlmSummaryAttestationRejected(_)),
982            "got {err:?}"
983        );
984        assert_eq!(
985            err.invariant(),
986            Some(super::super::DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
987        );
988    }
989
990    #[test]
991    fn llm_summary_rejects_tampered_signature() {
992        let pool = open_pool();
993        let id = DecayJobId::new();
994        let job = sample_llm_job(id);
995
996        let dir = std::env::temp_dir().join(format!("cortex-decay-test-tampered-{}", id.as_ulid()));
997        std::fs::create_dir_all(&dir).unwrap();
998        let env_path = dir.join("attestation.json");
999
1000        let signing_key = SigningKey::from_bytes(&[1u8; 32]);
1001        let mut envelope = signed_envelope(
1002            &id,
1003            "claude-sonnet-4-7@1",
1004            "blake3:2222222222222222222222222222222222222222222222222222222222222222",
1005            &signing_key,
1006        );
1007        // Tamper: flip one byte of the signature.
1008        envelope.signature_hex.replace_range(0..2, "ff");
1009        std::fs::write(
1010            &env_path,
1011            serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
1012        )
1013        .unwrap();
1014        let backend = NoopSummaryBackend;
1015        let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
1016            .expect_err("tampered signature must refuse");
1017        match err {
1018            DecayError::LlmSummaryAttestationRejected(detail) => {
1019                assert!(
1020                    detail.contains("signature") || detail.contains("did not verify"),
1021                    "detail: {detail}"
1022                );
1023            }
1024            other => panic!("expected LlmSummaryAttestationRejected, got {other:?}"),
1025        }
1026    }
1027
1028    #[test]
1029    fn llm_summary_rejects_malformed_envelope_json() {
1030        let pool = open_pool();
1031        let id = DecayJobId::new();
1032        let job = sample_llm_job(id);
1033
1034        let dir = std::env::temp_dir().join(format!("cortex-decay-test-bad-json-{}", id.as_ulid()));
1035        std::fs::create_dir_all(&dir).unwrap();
1036        let env_path = dir.join("attestation.json");
1037        std::fs::write(&env_path, "{ this is not json").unwrap();
1038
1039        let backend = NoopSummaryBackend;
1040        let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
1041            .expect_err("malformed JSON must refuse");
1042        assert!(matches!(err, DecayError::LlmSummaryAttestationRejected(_)));
1043    }
1044}