1use std::collections::BTreeSet;
79use std::path::Path;
80
81use chrono::Utc;
82use cortex_core::{DecayJobId, EpisodeId, MemoryId};
83use cortex_llm::{SummaryBackend, SummaryError, SummaryRequest, SummaryResponse};
84use cortex_store::repo::{
85 DecayJobRecord, DecayJobRepo, EpisodeRecord, EpisodeRepo, MemoryCandidate, MemoryRecord,
86 MemoryRepo,
87};
88use cortex_store::Pool;
89use ed25519_dalek::{Signature, Verifier, VerifyingKey};
90use serde::Deserialize;
91use serde_json::Value;
92
93use super::{
94 DecayError, DecayJobKind, DecayResult, SummaryMethod, DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE,
95 DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION, DECAY_SUMMARY_MAX_CLAIM_BYTES,
96};
97
98#[derive(Debug, Clone, Deserialize)]
103pub struct LlmSummaryOperatorAttestationEnvelope {
104 pub schema_version: u16,
107 pub purpose: String,
110 pub operator_verifying_key_hex: String,
112 pub operator_key_id: String,
114 pub signed_at: String,
116 pub decay_job_id: String,
120 pub model_name: String,
123 pub prompt_template_blake3: String,
127 pub signature_hex: String,
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135enum LlmSummarySourceKind {
136 Memory,
139 Episode,
141}
142
143pub fn run_llm_summary_job(
168 pool: &Pool,
169 job: &DecayJobRecord,
170 operator_attestation: Option<&Path>,
171 summary_backend: &dyn SummaryBackend,
172) -> DecayResult<MemoryId> {
173 let expected_wire = SummaryMethod::LlmSummary {
177 operator_attestation_required: true,
178 }
179 .method_wire();
180 if job.summary_method_wire != expected_wire {
181 return Err(DecayError::Validation(format!(
182 "run_llm_summary_job invoked on a job whose summary_method_wire is `{}` (expected `{expected_wire}`)",
183 job.summary_method_wire,
184 )));
185 }
186
187 let path = operator_attestation.ok_or(DecayError::LlmSummaryRequiresOperatorAttestation)?;
188 let envelope = load_envelope(path)?;
189 verify_envelope_for_job(&envelope, &job.id)?;
190
191 let source_kind = source_kind_from_job(job)?;
192 match source_kind {
193 LlmSummarySourceKind::Memory => {
194 let source_ids = parse_memory_source_ids(job)?;
195 let sources = load_memory_sources(pool, &source_ids)?;
196 let (request, claims) = build_request_from_memory_sources(&envelope, job, &sources);
197 let response = call_backend(summary_backend, &request)?;
198 validate_response(&response, &envelope, &claims)?;
199 let candidate = build_candidate_from_memory_sources(&sources, &source_ids, &response);
200 persist_summary_memory(pool, &candidate, job, &sources, &[])
201 }
202 LlmSummarySourceKind::Episode => {
203 let source_ids = parse_episode_source_ids(job)?;
204 let sources = load_episode_sources(pool, &source_ids)?;
205 let (request, claims) = build_request_from_episode_sources(&envelope, job, &sources);
206 let response = call_backend(summary_backend, &request)?;
207 validate_response(&response, &envelope, &claims)?;
208 let candidate = build_candidate_from_episode_sources(&sources, &source_ids, &response);
209 persist_summary_memory(pool, &candidate, job, &[], &sources)
210 }
211 }
212}
213
214pub fn run_llm_summary_job_typed(
217 pool: &Pool,
218 job: &DecayJobRecord,
219 kind: &DecayJobKind,
220 operator_attestation: Option<&Path>,
221 summary_backend: &dyn SummaryBackend,
222) -> DecayResult<MemoryId> {
223 match kind.summary_method() {
224 Some(SummaryMethod::LlmSummary { .. }) => {
225 run_llm_summary_job(pool, job, operator_attestation, summary_backend)
226 }
227 Some(SummaryMethod::DeterministicConcatenate) => Err(DecayError::Validation(
228 "run_llm_summary_job_typed invoked on a deterministic-concatenate job".into(),
229 )),
230 None => Err(DecayError::Validation(
231 "run_llm_summary_job_typed invoked on a kind that carries no summary method".into(),
232 )),
233 }
234}
235
236fn source_kind_from_job(job: &DecayJobRecord) -> DecayResult<LlmSummarySourceKind> {
237 match job.kind_wire.as_str() {
238 "candidate_compression" => Ok(LlmSummarySourceKind::Memory),
239 "episode_compression" => Ok(LlmSummarySourceKind::Episode),
240 other => Err(DecayError::Validation(format!(
241 "run_llm_summary_job invoked on a job whose kind_wire is `{other}` (expected `candidate_compression` or `episode_compression`)",
242 ))),
243 }
244}
245
246fn parse_memory_source_ids(job: &DecayJobRecord) -> DecayResult<Vec<MemoryId>> {
247 let array = job.source_ids_json.as_array().ok_or_else(|| {
248 DecayError::Validation("run_llm_summary_job: source_ids_json must be a JSON array".into())
249 })?;
250 let mut out = Vec::with_capacity(array.len());
251 for value in array {
252 let raw = value.as_str().ok_or_else(|| {
253 DecayError::Validation(
254 "run_llm_summary_job: source_ids_json entries must be strings".into(),
255 )
256 })?;
257 let id = raw.parse::<MemoryId>().map_err(|err| {
258 DecayError::Validation(format!(
259 "run_llm_summary_job: source id `{raw}` is not a memory id: {err}",
260 ))
261 })?;
262 out.push(id);
263 }
264 if out.is_empty() {
265 return Err(DecayError::Validation(
266 "run_llm_summary_job: source_ids_json must contain at least one id".into(),
267 ));
268 }
269 Ok(out)
270}
271
272fn parse_episode_source_ids(job: &DecayJobRecord) -> DecayResult<Vec<EpisodeId>> {
273 let array = job.source_ids_json.as_array().ok_or_else(|| {
274 DecayError::Validation("run_llm_summary_job: source_ids_json must be a JSON array".into())
275 })?;
276 let mut out = Vec::with_capacity(array.len());
277 for value in array {
278 let raw = value.as_str().ok_or_else(|| {
279 DecayError::Validation(
280 "run_llm_summary_job: source_ids_json entries must be strings".into(),
281 )
282 })?;
283 let id = raw.parse::<EpisodeId>().map_err(|err| {
284 DecayError::Validation(format!(
285 "run_llm_summary_job: source id `{raw}` is not an episode id: {err}",
286 ))
287 })?;
288 out.push(id);
289 }
290 if out.is_empty() {
291 return Err(DecayError::Validation(
292 "run_llm_summary_job: source_ids_json must contain at least one id".into(),
293 ));
294 }
295 Ok(out)
296}
297
298fn load_memory_sources(pool: &Pool, ids: &[MemoryId]) -> DecayResult<Vec<MemoryRecord>> {
299 let repo = MemoryRepo::new(pool);
300 let mut out = Vec::with_capacity(ids.len());
301 for id in ids {
302 match repo.get_by_id(id)? {
303 Some(record) => out.push(record),
304 None => {
305 return Err(DecayError::Validation(format!(
306 "{}: memory {id} not found",
307 super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT
308 )));
309 }
310 }
311 }
312 Ok(out)
313}
314
315fn load_episode_sources(pool: &Pool, ids: &[EpisodeId]) -> DecayResult<Vec<EpisodeRecord>> {
316 let repo = EpisodeRepo::new(pool);
317 let mut out = Vec::with_capacity(ids.len());
318 for id in ids {
319 match repo.get_by_id(id)? {
320 Some(record) => out.push(record),
321 None => {
322 return Err(DecayError::Validation(format!(
323 "{}: episode {id} not found",
324 super::DECAY_COMPRESS_SOURCE_MISSING_INVARIANT
325 )));
326 }
327 }
328 }
329 Ok(out)
330}
331
332fn build_request_from_memory_sources(
333 envelope: &LlmSummaryOperatorAttestationEnvelope,
334 job: &DecayJobRecord,
335 sources: &[MemoryRecord],
336) -> (SummaryRequest, Vec<String>) {
337 let claims: Vec<String> = sources.iter().map(|m| m.claim.clone()).collect();
338 let request = SummaryRequest {
339 model_name: envelope.model_name.clone(),
340 prompt_template_blake3: envelope.prompt_template_blake3.clone(),
341 source_claims: claims.clone(),
342 max_output_bytes: Some(DECAY_SUMMARY_MAX_CLAIM_BYTES),
343 decay_job_id: Some(job.id.to_string()),
344 };
345 (request, claims)
346}
347
348fn build_request_from_episode_sources(
349 envelope: &LlmSummaryOperatorAttestationEnvelope,
350 job: &DecayJobRecord,
351 sources: &[EpisodeRecord],
352) -> (SummaryRequest, Vec<String>) {
353 let claims: Vec<String> = sources.iter().map(|e| e.summary.clone()).collect();
354 let request = SummaryRequest {
355 model_name: envelope.model_name.clone(),
356 prompt_template_blake3: envelope.prompt_template_blake3.clone(),
357 source_claims: claims.clone(),
358 max_output_bytes: Some(DECAY_SUMMARY_MAX_CLAIM_BYTES),
359 decay_job_id: Some(job.id.to_string()),
360 };
361 (request, claims)
362}
363
364fn call_backend(
365 backend: &dyn SummaryBackend,
366 request: &SummaryRequest,
367) -> DecayResult<SummaryResponse> {
368 backend
369 .summarize(request)
370 .map_err(|err| DecayError::LlmSummaryBackendCallFailed(format_backend_error(err)))
371}
372
373fn format_backend_error(err: SummaryError) -> String {
374 err.to_string()
378}
379
380fn validate_response(
381 response: &SummaryResponse,
382 envelope: &LlmSummaryOperatorAttestationEnvelope,
383 source_claims: &[String],
384) -> DecayResult<()> {
385 if response.model_name_echoed != envelope.model_name {
386 return Err(DecayError::LlmSummaryBackendCallFailed(format!(
387 "model mismatch: backend echoed model_name=`{}` but operator attestation pinned model_name=`{}`",
388 response.model_name_echoed, envelope.model_name,
389 )));
390 }
391 if response.claim.trim().is_empty() {
392 return Err(DecayError::LlmSummaryBackendCallFailed(
393 "backend produced an empty summary claim".into(),
394 ));
395 }
396 if response.claim.len() > DECAY_SUMMARY_MAX_CLAIM_BYTES {
397 return Err(DecayError::LlmSummaryBackendCallFailed(format!(
398 "backend produced a summary claim of {} bytes (limit {DECAY_SUMMARY_MAX_CLAIM_BYTES})",
399 response.claim.len(),
400 )));
401 }
402 if source_claims.is_empty() {
403 return Err(DecayError::Validation(
404 "run_llm_summary_job: source_claims must not be empty after loading sources".into(),
405 ));
406 }
407 Ok(())
408}
409
410fn build_candidate_from_memory_sources(
411 sources: &[MemoryRecord],
412 source_ids: &[MemoryId],
413 response: &SummaryResponse,
414) -> MemoryCandidate {
415 let confidence = pessimistic_min_confidence(sources.iter().map(|m| m.confidence));
420 let authority = lowest_authority_label(sources.iter().map(|m| m.authority.as_str()));
421
422 let domains = union_json_strings(sources.iter().map(|m| &m.domains_json));
423 let source_events = union_json_strings(sources.iter().map(|m| &m.source_events_json));
424 let source_episodes = union_json_strings(sources.iter().map(|m| &m.source_episodes_json));
425
426 let source_episodes =
431 if json_array_is_empty(&source_episodes) && json_array_is_empty(&source_events) {
432 Value::Array(
433 source_ids
434 .iter()
435 .map(|id| Value::String(id.to_string()))
436 .collect(),
437 )
438 } else {
439 source_episodes
440 };
441
442 let applies_when = serde_json::json!({
443 "summary_of_memories": source_ids
444 .iter()
445 .map(ToString::to_string)
446 .collect::<Vec<_>>(),
447 "llm_summary": {
448 "model_name": response.model_name_echoed,
449 }
450 });
451
452 let now = Utc::now();
453 MemoryCandidate {
454 id: MemoryId::new(),
455 memory_type: "summary".into(),
456 claim: response.claim.clone(),
457 source_episodes_json: source_episodes,
458 source_events_json: source_events,
459 domains_json: domains,
460 salience_json: Value::Object(serde_json::Map::new()),
461 confidence,
462 authority,
463 applies_when_json: applies_when,
464 does_not_apply_when_json: Value::Array(Vec::new()),
465 created_at: now,
466 updated_at: now,
467 }
468}
469
470fn build_candidate_from_episode_sources(
471 sources: &[EpisodeRecord],
472 source_ids: &[EpisodeId],
473 response: &SummaryResponse,
474) -> MemoryCandidate {
475 let confidence = pessimistic_min_confidence(sources.iter().map(|e| e.confidence));
476 let authority = "derived".to_string();
480
481 let domains = union_json_strings(sources.iter().map(|e| &e.domains_json));
482 let source_events = union_json_strings(sources.iter().map(|e| &e.source_events_json));
483 let source_episodes = Value::Array(
484 source_ids
485 .iter()
486 .map(|id| Value::String(id.to_string()))
487 .collect(),
488 );
489
490 let applies_when = serde_json::json!({
491 "summary_of_episodes": source_ids
492 .iter()
493 .map(ToString::to_string)
494 .collect::<Vec<_>>(),
495 "llm_summary": {
496 "model_name": response.model_name_echoed,
497 }
498 });
499
500 let now = Utc::now();
501 MemoryCandidate {
502 id: MemoryId::new(),
503 memory_type: "summary".into(),
504 claim: response.claim.clone(),
505 source_episodes_json: source_episodes,
506 source_events_json: source_events,
507 domains_json: domains,
508 salience_json: Value::Object(serde_json::Map::new()),
509 confidence,
510 authority,
511 applies_when_json: applies_when,
512 does_not_apply_when_json: Value::Array(Vec::new()),
513 created_at: now,
514 updated_at: now,
515 }
516}
517
518fn persist_summary_memory(
519 pool: &Pool,
520 candidate: &MemoryCandidate,
521 job: &DecayJobRecord,
522 memory_sources: &[MemoryRecord],
523 episode_sources: &[EpisodeRecord],
524) -> DecayResult<MemoryId> {
525 let memory_repo = MemoryRepo::new(pool);
526 memory_repo.insert_candidate(candidate)?;
527 let summary_id = candidate.id;
528
529 let job_repo = DecayJobRepo::new(pool);
530 let now = Utc::now();
531 for source in memory_sources {
532 job_repo.record_memory_supersession(&source.id, &summary_id, Some(&job.id), now)?;
533 }
534 for source in episode_sources {
535 job_repo.record_episode_supersession(&source.id, &summary_id, Some(&job.id), now)?;
536 }
537 Ok(summary_id)
538}
539
540fn pessimistic_min_confidence<I: IntoIterator<Item = f64>>(values: I) -> f64 {
541 values
542 .into_iter()
543 .fold(f64::INFINITY, |acc, v| acc.min(v))
544 .clamp(0.0, 1.0)
545}
546
547fn lowest_authority_label<'a, I: IntoIterator<Item = &'a str>>(labels: I) -> String {
548 #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
550 enum Tier {
551 Derived,
552 Candidate,
553 Agent,
554 User,
555 }
556 fn parse(label: &str) -> Tier {
557 match label {
558 "user" | "User" => Tier::User,
559 "agent" | "Agent" => Tier::Agent,
560 "candidate" | "Candidate" => Tier::Candidate,
561 _ => Tier::Derived,
562 }
563 }
564 let mut min_tier = Tier::User;
565 let mut min_label: Option<String> = None;
566 for label in labels {
567 let tier = parse(label);
568 if tier <= min_tier {
569 min_tier = tier;
570 min_label = Some(label.to_string());
571 }
572 }
573 min_label.unwrap_or_else(|| match min_tier {
574 Tier::Derived => "derived".into(),
575 Tier::Candidate => "candidate".into(),
576 Tier::Agent => "agent".into(),
577 Tier::User => "user".into(),
578 })
579}
580
581fn union_json_strings<'a, I: IntoIterator<Item = &'a Value>>(arrays: I) -> Value {
582 let mut seen: BTreeSet<String> = BTreeSet::new();
583 let mut ordered: Vec<Value> = Vec::new();
584 for value in arrays {
585 match value {
586 Value::Array(items) => {
587 for item in items {
588 let key = canonical_key(item);
589 if seen.insert(key) {
590 ordered.push(item.clone());
591 }
592 }
593 }
594 Value::String(s) => {
595 let v = Value::String(s.clone());
596 let key = canonical_key(&v);
597 if seen.insert(key) {
598 ordered.push(v);
599 }
600 }
601 _ => {}
602 }
603 }
604 Value::Array(ordered)
605}
606
607fn json_array_is_empty(value: &Value) -> bool {
608 match value {
609 Value::Array(a) => a.is_empty(),
610 _ => true,
611 }
612}
613
614fn canonical_key(value: &Value) -> String {
615 serde_json::to_string(value).unwrap_or_else(|_| value.to_string())
616}
617
618fn load_envelope(path: &Path) -> DecayResult<LlmSummaryOperatorAttestationEnvelope> {
619 if !path.is_file() {
620 return Err(DecayError::LlmSummaryAttestationRejected(format!(
621 "envelope `{}` not found",
622 path.display()
623 )));
624 }
625 let raw = std::fs::read_to_string(path).map_err(|err| {
626 DecayError::LlmSummaryAttestationRejected(format!(
627 "envelope `{}` could not be read: {err}",
628 path.display()
629 ))
630 })?;
631 serde_json::from_str(&raw).map_err(|err| {
632 DecayError::LlmSummaryAttestationRejected(format!(
633 "envelope `{}` is not valid JSON: {err}",
634 path.display()
635 ))
636 })
637}
638
639fn verify_envelope_for_job(
640 envelope: &LlmSummaryOperatorAttestationEnvelope,
641 expected_job_id: &DecayJobId,
642) -> DecayResult<()> {
643 if envelope.schema_version != DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION {
644 return Err(DecayError::LlmSummaryAttestationRejected(format!(
645 "schema_version {} (expected {})",
646 envelope.schema_version, DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION,
647 )));
648 }
649 if envelope.purpose != DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE {
650 return Err(DecayError::LlmSummaryAttestationRejected(format!(
651 "purpose `{}` (expected `{}`)",
652 envelope.purpose, DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE,
653 )));
654 }
655 if envelope.decay_job_id != expected_job_id.to_string() {
656 return Err(DecayError::LlmSummaryAttestationRejected(format!(
657 "decay_job_id `{}` does not match job `{}`",
658 envelope.decay_job_id, expected_job_id,
659 )));
660 }
661 if envelope.model_name.trim().is_empty() {
662 return Err(DecayError::LlmSummaryAttestationRejected(
663 "model_name must be non-empty".into(),
664 ));
665 }
666 if !envelope.prompt_template_blake3.starts_with("blake3:") {
667 return Err(DecayError::LlmSummaryAttestationRejected(
668 "prompt_template_blake3 must use the `blake3:` prefix".into(),
669 ));
670 }
671
672 let verifying_key_bytes =
673 decode_lowercase_hex(&envelope.operator_verifying_key_hex).map_err(|detail| {
674 DecayError::LlmSummaryAttestationRejected(format!(
675 "operator_verifying_key_hex malformed: {detail}"
676 ))
677 })?;
678 if verifying_key_bytes.len() != 32 {
679 return Err(DecayError::LlmSummaryAttestationRejected(format!(
680 "operator_verifying_key_hex must decode to 32 bytes; got {}",
681 verifying_key_bytes.len()
682 )));
683 }
684 let mut key_array = [0u8; 32];
685 key_array.copy_from_slice(&verifying_key_bytes);
686 let verifying_key = VerifyingKey::from_bytes(&key_array).map_err(|err| {
687 DecayError::LlmSummaryAttestationRejected(format!(
688 "operator_verifying_key_hex did not parse: {err}"
689 ))
690 })?;
691
692 let signature_bytes = decode_lowercase_hex(&envelope.signature_hex).map_err(|detail| {
693 DecayError::LlmSummaryAttestationRejected(format!("signature_hex malformed: {detail}"))
694 })?;
695 if signature_bytes.len() != 64 {
696 return Err(DecayError::LlmSummaryAttestationRejected(format!(
697 "signature_hex must decode to 64 bytes; got {}",
698 signature_bytes.len()
699 )));
700 }
701 let mut sig_array = [0u8; 64];
702 sig_array.copy_from_slice(&signature_bytes);
703 let signature = Signature::from_bytes(&sig_array);
704
705 let signing_input = canonical_signing_input(envelope);
706 verifying_key
707 .verify(&signing_input, &signature)
708 .map_err(|_| {
709 DecayError::LlmSummaryAttestationRejected(
710 "Ed25519 signature did not verify under the declared operator key".into(),
711 )
712 })?;
713 Ok(())
714}
715
716pub fn canonical_signing_input(env: &LlmSummaryOperatorAttestationEnvelope) -> Vec<u8> {
725 const DOMAIN_TAG_LLM_SUMMARY: u8 = 0x21;
726 let mut out = Vec::new();
727 out.push(DOMAIN_TAG_LLM_SUMMARY);
728 out.extend_from_slice(&env.schema_version.to_be_bytes());
729 push_lp(&mut out, env.purpose.as_bytes());
730 push_lp(&mut out, env.operator_key_id.as_bytes());
731 push_lp(&mut out, env.signed_at.as_bytes());
732 push_lp(&mut out, env.decay_job_id.as_bytes());
733 push_lp(&mut out, env.model_name.as_bytes());
734 push_lp(&mut out, env.prompt_template_blake3.as_bytes());
735 out
736}
737
738fn push_lp(out: &mut Vec<u8>, bytes: &[u8]) {
739 out.extend_from_slice(&(bytes.len() as u64).to_be_bytes());
740 out.extend_from_slice(bytes);
741}
742
743fn decode_lowercase_hex(input: &str) -> Result<Vec<u8>, String> {
744 if input.len() % 2 != 0 {
745 return Err(format!("odd hex length {}", input.len()));
746 }
747 let mut out = Vec::with_capacity(input.len() / 2);
748 let bytes = input.as_bytes();
749 let mut i = 0;
750 while i < bytes.len() {
751 let hi = hex_nibble(bytes[i]).ok_or_else(|| format!("invalid hex byte at offset {i}"))?;
752 let lo = hex_nibble(bytes[i + 1])
753 .ok_or_else(|| format!("invalid hex byte at offset {}", i + 1))?;
754 out.push((hi << 4) | lo);
755 i += 2;
756 }
757 Ok(out)
758}
759
760fn hex_nibble(byte: u8) -> Option<u8> {
761 match byte {
762 b'0'..=b'9' => Some(byte - b'0'),
763 b'a'..=b'f' => Some(byte - b'a' + 10),
764 _ => None,
765 }
766}
767
768#[cfg(test)]
769mod tests {
770 use super::*;
771 use chrono::Utc;
772 use cortex_core::DecayJobId;
773 use cortex_llm::NoopSummaryBackend;
774 use cortex_store::migrate::apply_pending;
775 use cortex_store::repo::DecayJobRecord;
776 use ed25519_dalek::{Signer, SigningKey};
777 use rusqlite::Connection;
778 use serde_json::json;
779
780 fn open_pool() -> Pool {
781 let pool = Connection::open_in_memory().expect("open in-memory pool");
782 apply_pending(&pool).expect("apply migrations");
783 pool
784 }
785
786 fn sample_llm_job(id: DecayJobId) -> DecayJobRecord {
787 let now = Utc::now();
788 DecayJobRecord {
789 id,
790 kind_wire: "candidate_compression".into(),
791 summary_method_wire: "llm_summary".into(),
792 source_ids_json: json!(["mem_01ARZ3NDEKTSV4RRFFQ69G5FAV"]),
793 state_wire: "in_progress".into(),
794 state_reason: None,
795 result_memory_id: None,
796 scheduled_for: now,
797 created_at: now,
798 created_by: "operator:test".into(),
799 updated_at: now,
800 }
801 }
802
803 fn lowercase_hex(bytes: &[u8]) -> String {
804 let mut s = String::with_capacity(bytes.len() * 2);
805 for b in bytes {
806 s.push_str(&format!("{b:02x}"));
807 }
808 s
809 }
810
811 fn signed_envelope(
812 job_id: &DecayJobId,
813 model: &str,
814 prompt_digest: &str,
815 signing_key: &SigningKey,
816 ) -> LlmSummaryOperatorAttestationEnvelope {
817 let signed_at = Utc::now().to_rfc3339();
818 let envelope = LlmSummaryOperatorAttestationEnvelope {
819 schema_version: DECAY_LLM_SUMMARY_ATTESTATION_SCHEMA_VERSION,
820 purpose: DECAY_LLM_SUMMARY_ATTESTATION_PURPOSE.into(),
821 operator_verifying_key_hex: lowercase_hex(signing_key.verifying_key().as_bytes()),
822 operator_key_id: "cortex-operator-tests".into(),
823 signed_at,
824 decay_job_id: job_id.to_string(),
825 model_name: model.into(),
826 prompt_template_blake3: prompt_digest.into(),
827 signature_hex: String::new(),
828 };
829 let input = canonical_signing_input(&envelope);
830 let signature = signing_key.sign(&input);
831 let signature_hex = lowercase_hex(&signature.to_bytes());
832 LlmSummaryOperatorAttestationEnvelope {
833 signature_hex,
834 ..envelope
835 }
836 }
837
838 fn envelope_as_json(env: &LlmSummaryOperatorAttestationEnvelope) -> serde_json::Value {
839 serde_json::json!({
840 "schema_version": env.schema_version,
841 "purpose": env.purpose,
842 "operator_verifying_key_hex": env.operator_verifying_key_hex,
843 "operator_key_id": env.operator_key_id,
844 "signed_at": env.signed_at,
845 "decay_job_id": env.decay_job_id,
846 "model_name": env.model_name,
847 "prompt_template_blake3": env.prompt_template_blake3,
848 "signature_hex": env.signature_hex,
849 })
850 }
851
852 #[test]
853 fn llm_summary_refuses_without_operator_attestation() {
854 let pool = open_pool();
855 let id = DecayJobId::new();
856 let job = sample_llm_job(id);
857 let backend = NoopSummaryBackend;
858
859 let err = run_llm_summary_job(&pool, &job, None, &backend).expect_err("must refuse");
860 assert!(
861 matches!(err, DecayError::LlmSummaryRequiresOperatorAttestation),
862 "got {err:?}"
863 );
864 assert_eq!(
865 err.invariant(),
866 Some(super::super::DECAY_LLM_SUMMARY_REQUIRES_OPERATOR_ATTESTATION_INVARIANT)
867 );
868 }
869
870 #[test]
871 fn llm_summary_refuses_on_deterministic_method_record() {
872 let pool = open_pool();
876 let id = DecayJobId::new();
877 let mut job = sample_llm_job(id);
878 job.summary_method_wire = "deterministic_concatenate".into();
879 let backend = NoopSummaryBackend;
880 let err = run_llm_summary_job(&pool, &job, None, &backend).expect_err("must refuse");
881 assert!(matches!(err, DecayError::Validation(_)), "got {err:?}");
882 }
883
884 #[test]
885 fn llm_summary_returns_backend_not_configured_with_noop() {
886 let pool = open_pool();
892 let source_id = MemoryId::new();
896 seed_candidate_memory(&pool, &source_id, "alpha");
897
898 let id = DecayJobId::new();
899 let mut job = sample_llm_job(id);
900 job.source_ids_json = json!([source_id.to_string()]);
901 let dir = std::env::temp_dir().join(format!("cortex-decay-test-{}", id.as_ulid()));
902 std::fs::create_dir_all(&dir).unwrap();
903 let env_path = dir.join("attestation.json");
904
905 let signing_key = SigningKey::from_bytes(&[7u8; 32]);
906 let envelope = signed_envelope(
907 &id,
908 "claude-sonnet-4-7@1",
909 "blake3:0000000000000000000000000000000000000000000000000000000000000000",
910 &signing_key,
911 );
912 std::fs::write(
913 &env_path,
914 serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
915 )
916 .unwrap();
917
918 let backend = NoopSummaryBackend;
919 let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
920 .expect_err("noop backend must refuse");
921 match err {
922 DecayError::LlmSummaryBackendCallFailed(detail) => {
923 assert!(
924 detail.contains("summary_backend_not_configured"),
925 "detail: {detail}"
926 );
927 }
928 other => panic!("expected LlmSummaryBackendCallFailed, got {other:?}"),
929 }
930 }
931
932 fn seed_candidate_memory(pool: &Pool, id: &MemoryId, claim: &str) {
933 let candidate = MemoryCandidate {
934 id: *id,
935 memory_type: "semantic".into(),
936 claim: claim.into(),
937 source_episodes_json: Value::Array(Vec::new()),
938 source_events_json: Value::Array(vec![Value::String(
939 "evt_01ARZ3NDEKTSV4RRFFQ69G5FAV".into(),
940 )]),
941 domains_json: Value::Array(vec![Value::String("t".into())]),
942 salience_json: Value::Object(serde_json::Map::new()),
943 confidence: 0.7,
944 authority: "candidate".into(),
945 applies_when_json: Value::Object(serde_json::Map::new()),
946 does_not_apply_when_json: Value::Array(Vec::new()),
947 created_at: Utc::now(),
948 updated_at: Utc::now(),
949 };
950 MemoryRepo::new(pool).insert_candidate(&candidate).unwrap();
951 }
952
953 #[test]
954 fn llm_summary_rejects_envelope_for_wrong_job() {
955 let pool = open_pool();
956 let id = DecayJobId::new();
957 let other = DecayJobId::new();
958 let job = sample_llm_job(id);
959
960 let dir =
961 std::env::temp_dir().join(format!("cortex-decay-test-wrong-job-{}", id.as_ulid()));
962 std::fs::create_dir_all(&dir).unwrap();
963 let env_path = dir.join("attestation.json");
964
965 let signing_key = SigningKey::from_bytes(&[3u8; 32]);
966 let envelope = signed_envelope(
967 &other,
968 "claude-sonnet-4-7@1",
969 "blake3:1111111111111111111111111111111111111111111111111111111111111111",
970 &signing_key,
971 );
972 std::fs::write(
973 &env_path,
974 serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
975 )
976 .unwrap();
977 let backend = NoopSummaryBackend;
978 let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
979 .expect_err("envelope mismatch must refuse");
980 assert!(
981 matches!(err, DecayError::LlmSummaryAttestationRejected(_)),
982 "got {err:?}"
983 );
984 assert_eq!(
985 err.invariant(),
986 Some(super::super::DECAY_LLM_SUMMARY_ATTESTATION_REJECTED_INVARIANT)
987 );
988 }
989
990 #[test]
991 fn llm_summary_rejects_tampered_signature() {
992 let pool = open_pool();
993 let id = DecayJobId::new();
994 let job = sample_llm_job(id);
995
996 let dir = std::env::temp_dir().join(format!("cortex-decay-test-tampered-{}", id.as_ulid()));
997 std::fs::create_dir_all(&dir).unwrap();
998 let env_path = dir.join("attestation.json");
999
1000 let signing_key = SigningKey::from_bytes(&[1u8; 32]);
1001 let mut envelope = signed_envelope(
1002 &id,
1003 "claude-sonnet-4-7@1",
1004 "blake3:2222222222222222222222222222222222222222222222222222222222222222",
1005 &signing_key,
1006 );
1007 envelope.signature_hex.replace_range(0..2, "ff");
1009 std::fs::write(
1010 &env_path,
1011 serde_json::to_string(&envelope_as_json(&envelope)).unwrap(),
1012 )
1013 .unwrap();
1014 let backend = NoopSummaryBackend;
1015 let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
1016 .expect_err("tampered signature must refuse");
1017 match err {
1018 DecayError::LlmSummaryAttestationRejected(detail) => {
1019 assert!(
1020 detail.contains("signature") || detail.contains("did not verify"),
1021 "detail: {detail}"
1022 );
1023 }
1024 other => panic!("expected LlmSummaryAttestationRejected, got {other:?}"),
1025 }
1026 }
1027
1028 #[test]
1029 fn llm_summary_rejects_malformed_envelope_json() {
1030 let pool = open_pool();
1031 let id = DecayJobId::new();
1032 let job = sample_llm_job(id);
1033
1034 let dir = std::env::temp_dir().join(format!("cortex-decay-test-bad-json-{}", id.as_ulid()));
1035 std::fs::create_dir_all(&dir).unwrap();
1036 let env_path = dir.join("attestation.json");
1037 std::fs::write(&env_path, "{ this is not json").unwrap();
1038
1039 let backend = NoopSummaryBackend;
1040 let err = run_llm_summary_job(&pool, &job, Some(env_path.as_path()), &backend)
1041 .expect_err("malformed JSON must refuse");
1042 assert!(matches!(err, DecayError::LlmSummaryAttestationRejected(_)));
1043 }
1044}