Skip to main content

harn_vm/orchestration/
lifecycle_receipts.rs

1//! Replay-deterministic lifecycle receipts (#1861, epic #1853 P-08).
2//!
3//! Suspend / resume / drain are journal entries. Per SOTA replay-engine
4//! research (Temporal, Restate, Inngest, Azure Durable, Cadence), on
5//! replay these lifecycle events must return memoized values rather than
6//! re-execute. This module is the source-of-truth for the persisted
7//! receipt shapes and the helpers used by `replay_oracle.rs` and the
8//! conformance suite to assert determinism end-to-end.
9//!
10//! Design notes:
11//!
12//! * **Signed timestamps.** Every receipt carries a [`SignedLifecycleTimestamp`]
13//!   that pins `at_ms` to the recorded wall time at the original run. The
14//!   signature is HMAC-SHA256 over `(kind, at_ms, subject_id, initiator_id)`
15//!   under a per-process salt. Replay must use the journal timestamp, not
16//!   the current wall clock — `verify_signed_timestamp` lets the oracle
17//!   prove the on-disk timestamp came from the original run before
18//!   accepting it as ground truth.
19//! * **Cached resume inputs.** [`ResumptionReceipt`] stores both `input`
20//!   and `input_hash`. On replay, the runtime feeds the cached `input`
21//!   back into the suspended worker instead of re-prompting and asserts
22//!   that any fresh input it would otherwise compute matches
23//!   `input_hash`. Mismatch surfaces `HARN-SUS-011`.
24//! * **Privacy.** The full `input` value is journaled (so replay is
25//!   self-contained), but [`record_resumption_receipt`] applies the
26//!   configured redactor before persisting. Callers that hand off a
27//!   secret-bearing payload pass a `RedactionPolicy` that nulls the
28//!   sensitive paths; the hash is still computed against the *original*
29//!   payload so determinism checks survive redaction.
30//! * **Memoized drain decisions.** [`DrainDecisionReceipt`] captures the
31//!   settlement agent's chosen `action` per drain item. On replay, the
32//!   oracle reads the receipt instead of re-spawning the settlement
33//!   agent's LLM call — changing the prompt invalidates the replay
34//!   precisely because the recorded hash no longer matches.
35//!
36//! The module is intentionally JSON-shaped at the boundary so the
37//! existing `run_record_*` and event-log persistence works without
38//! schema changes. The replay oracle reads back the topic-scoped journal
39//! entries and feeds them into [`ReplayTraceRun::approval_interactions`]
40//! / [`lifecycle_audit_log`].
41
42use std::cell::RefCell;
43use std::collections::BTreeMap;
44use std::sync::OnceLock;
45
46use serde::{Deserialize, Serialize};
47use serde_json::Value as JsonValue;
48use sha2::{Digest, Sha256};
49use time::format_description::well_known::Rfc3339;
50
51use crate::event_log::{active_event_log, EventLog, LogEvent, Topic};
52
53/// Event-log topic that carries the persisted lifecycle receipts. One
54/// topic for all three shapes keeps the replay oracle's cursor logic
55/// simple — kind discriminates suspension / resumption / drain.
56pub const LIFECYCLE_RECEIPT_TOPIC: &str = "agent.lifecycle.receipts";
57
58pub const SUSPENSION_RECEIPT_KIND: &str = "suspension_receipt";
59pub const RESUMPTION_RECEIPT_KIND: &str = "resumption_receipt";
60pub const DRAIN_DECISION_RECEIPT_KIND: &str = "drain_decision_receipt";
61
62pub const SIGNED_TIMESTAMP_ALGORITHM: &str = "hmac-sha256";
63pub const SIGNED_TIMESTAMP_KEY_ID: &str = "local-session";
64
65/// Per-process signing salt for lifecycle timestamps. Mirrors the
66/// channels-module pattern (`SIGNING_SALT` in `channels.rs`): a fresh
67/// salt per process so signatures cannot replay across runs, but stable
68/// for the duration of one run so the in-process replay oracle can
69/// verify what it recorded a moment earlier.
70static LIFECYCLE_SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
71
72fn lifecycle_signing_salt() -> &'static [u8] {
73    LIFECYCLE_SIGNING_SALT
74        .get_or_init(|| {
75            format!(
76                "harn-lifecycle-signing-salt:{}:{}",
77                std::process::id(),
78                uuid::Uuid::now_v7()
79            )
80            .into_bytes()
81        })
82        .as_slice()
83}
84
85/// Signed wall-clock timestamp carried on every lifecycle receipt.
86///
87/// `at_ms` is the canonical replay-time value; the human-readable RFC3339
88/// `at` mirror exists for log inspection only. `signature` proves the
89/// `(kind, at_ms, subject_id, initiator_id)` tuple was produced by this
90/// process — corruption of any of those fields after the fact will fail
91/// [`verify_signed_timestamp`] and the replay oracle will reject the
92/// receipt.
93#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
94pub struct SignedLifecycleTimestamp {
95    pub at_ms: i64,
96    pub at: String,
97    pub algorithm: String,
98    pub key_id: String,
99    pub signature: String,
100}
101
102impl SignedLifecycleTimestamp {
103    /// Build a fresh signed timestamp pinned to the current mock-aware
104    /// wall clock. The signature material binds the four
105    /// receipt-identifying fields so a journal entry that gets its
106    /// `at_ms` rewritten on disk fails verification.
107    pub fn now_for(kind: &str, subject_id: &str, initiator_id: &str) -> Self {
108        let at = crate::clock_mock::now_utc();
109        let at_ms = (at.unix_timestamp_nanos() / 1_000_000) as i64;
110        let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
111        let signature = sign_timestamp_material(kind, at_ms, subject_id, initiator_id);
112        Self {
113            at_ms,
114            at: at_text,
115            algorithm: SIGNED_TIMESTAMP_ALGORITHM.to_string(),
116            key_id: SIGNED_TIMESTAMP_KEY_ID.to_string(),
117            signature,
118        }
119    }
120}
121
122fn sign_timestamp_material(kind: &str, at_ms: i64, subject_id: &str, initiator_id: &str) -> String {
123    let material = format!(
124        "harn.lifecycle.timestamp.v1\nkind={kind}\nat_ms={at_ms}\nsubject={subject_id}\ninitiator={initiator_id}\n"
125    );
126    let mac = crate::connectors::hmac::hmac_sha256(lifecycle_signing_salt(), material.as_bytes());
127    format!("sha256:{}", hex::encode(mac))
128}
129
130/// Verify that a [`SignedLifecycleTimestamp`] was minted by this process
131/// for the given `(kind, subject_id, initiator_id)` tuple. Returns `Err`
132/// if the algorithm/key changed (forward compatibility), the signature
133/// is malformed, or the recomputed material disagrees with the stored
134/// signature. Used by the replay oracle before it accepts an on-disk
135/// receipt as ground truth.
136pub fn verify_signed_timestamp(
137    stamp: &SignedLifecycleTimestamp,
138    kind: &str,
139    subject_id: &str,
140    initiator_id: &str,
141) -> Result<(), LifecycleReceiptError> {
142    if stamp.algorithm != SIGNED_TIMESTAMP_ALGORITHM {
143        return Err(LifecycleReceiptError::SignatureAlgorithmMismatch {
144            expected: SIGNED_TIMESTAMP_ALGORITHM.to_string(),
145            found: stamp.algorithm.clone(),
146        });
147    }
148    if stamp.key_id != SIGNED_TIMESTAMP_KEY_ID {
149        return Err(LifecycleReceiptError::SignatureKeyMismatch {
150            expected: SIGNED_TIMESTAMP_KEY_ID.to_string(),
151            found: stamp.key_id.clone(),
152        });
153    }
154    let expected = sign_timestamp_material(kind, stamp.at_ms, subject_id, initiator_id);
155    if expected != stamp.signature {
156        return Err(LifecycleReceiptError::SignatureMismatch {
157            expected,
158            found: stamp.signature.clone(),
159        });
160    }
161    Ok(())
162}
163
164/// Origin of a recorded suspension. Mirrors
165/// `WorkerSuspension::initiator` and the `Suspension`-shape transcript
166/// constructor so the on-disk shape is unified across the lifecycle stack.
167#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
168#[serde(rename_all = "snake_case")]
169pub enum SuspendInitiator {
170    #[serde(rename = "self")]
171    SelfInitiated,
172    Parent,
173    #[default]
174    Operator,
175    Triggered,
176}
177
178impl SuspendInitiator {
179    pub fn as_str(self) -> &'static str {
180        match self {
181            Self::SelfInitiated => "self",
182            Self::Parent => "parent",
183            Self::Operator => "operator",
184            Self::Triggered => "triggered",
185        }
186    }
187}
188
189/// Origin of a recorded resumption. Aligned with
190/// `helpers::transcript::ResumptionInitiator`.
191#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
192#[serde(rename_all = "snake_case")]
193pub enum ResumeInitiator {
194    #[default]
195    Parent,
196    Operator,
197    Triggered,
198    DrainAgent,
199    Timeout,
200}
201
202impl ResumeInitiator {
203    pub fn as_str(self) -> &'static str {
204        match self {
205            Self::Parent => "parent",
206            Self::Operator => "operator",
207            Self::Triggered => "triggered",
208            Self::DrainAgent => "drain_agent",
209            Self::Timeout => "timeout",
210        }
211    }
212
213    pub fn parse(value: &str) -> Self {
214        match value.trim() {
215            "operator" => Self::Operator,
216            "triggered" | "trigger" => Self::Triggered,
217            "drain_agent" | "drain-agent" | "settlement" => Self::DrainAgent,
218            "timeout" => Self::Timeout,
219            _ => Self::Parent,
220        }
221    }
222}
223
224/// What an auto-resume trigger matched against. Carried on a
225/// [`ResumptionReceipt`] when a trigger drove the resume so replay can
226/// reproduce the exact match without re-firing the connector.
227#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
228pub struct TriggerMatchInfo {
229    pub source: String,
230    pub event_id: String,
231    pub filter_summary: String,
232}
233
234/// Drain decision categories. Mirrors
235/// `DrainDecisionItemCategory` in `helpers::transcript`.
236#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
237#[serde(rename_all = "snake_case")]
238pub enum DrainItemCategory {
239    #[default]
240    SuspendedSubagent,
241    QueuedTrigger,
242    PartialHandoff,
243    InFlightLlmCall,
244}
245
246#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
247pub struct DrainItem {
248    pub category: DrainItemCategory,
249    pub id: String,
250    pub summary: String,
251}
252
253/// Action the settlement agent chose for a drain item.
254#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
255#[serde(rename_all = "snake_case")]
256pub enum DrainAction {
257    #[default]
258    Resume,
259    Cancel,
260    Handoff,
261    Acknowledge,
262    Defer,
263    Wait,
264    Finalize,
265}
266
267/// Persisted suspension event. Recorded at the moment a worker enters
268/// the `suspended` state, replayed verbatim from the journal during a
269/// second run.
270#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
271pub struct SuspensionReceipt {
272    pub handle: String,
273    pub session_id: Option<String>,
274    pub initiator: SuspendInitiator,
275    pub initiator_id: String,
276    pub reason: String,
277    #[serde(default, skip_serializing_if = "Option::is_none")]
278    pub conditions: Option<JsonValue>,
279    pub suspended_at: SignedLifecycleTimestamp,
280    #[serde(default, skip_serializing_if = "Option::is_none")]
281    pub span_id: Option<String>,
282}
283
284impl SuspensionReceipt {
285    /// Build a fresh receipt and sign its timestamp.
286    pub fn new(
287        handle: impl Into<String>,
288        session_id: Option<String>,
289        initiator: SuspendInitiator,
290        initiator_id: impl Into<String>,
291        reason: impl Into<String>,
292        conditions: Option<JsonValue>,
293        span_id: Option<String>,
294    ) -> Self {
295        let handle = handle.into();
296        let initiator_id = initiator_id.into();
297        let suspended_at =
298            SignedLifecycleTimestamp::now_for(SUSPENSION_RECEIPT_KIND, &handle, &initiator_id);
299        Self {
300            handle,
301            session_id,
302            initiator,
303            initiator_id,
304            reason: reason.into(),
305            conditions,
306            suspended_at,
307            span_id,
308        }
309    }
310
311    /// Verify the receipt's signed timestamp. Wraps
312    /// [`verify_signed_timestamp`] with the receipt-specific subject /
313    /// initiator binding.
314    pub fn verify_signature(&self) -> Result<(), LifecycleReceiptError> {
315        verify_signed_timestamp(
316            &self.suspended_at,
317            SUSPENSION_RECEIPT_KIND,
318            &self.handle,
319            &self.initiator_id,
320        )
321    }
322}
323
324/// Persisted resumption event. The `input` field is the cached resume
325/// input (post-redaction); `input_hash` is the deterministic
326/// fingerprint of the *unredacted* payload so replay can detect drift
327/// even when the journaled value was scrubbed.
328#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
329pub struct ResumptionReceipt {
330    pub handle: String,
331    pub session_id: Option<String>,
332    pub initiator: ResumeInitiator,
333    pub initiator_id: String,
334    #[serde(default, skip_serializing_if = "Option::is_none")]
335    pub input: Option<JsonValue>,
336    pub input_hash: String,
337    pub continue_transcript: bool,
338    #[serde(default, skip_serializing_if = "Option::is_none")]
339    pub linked_suspension_span_id: Option<String>,
340    #[serde(default, skip_serializing_if = "Option::is_none")]
341    pub trigger_match: Option<TriggerMatchInfo>,
342    pub resumed_at: SignedLifecycleTimestamp,
343}
344
345impl ResumptionReceipt {
346    /// Build a fresh resumption receipt. The hash is computed against
347    /// `original_input` so post-redaction values still validate against
348    /// the unredacted fingerprint. `journaled_input` may be the
349    /// post-redaction value (or `None` if the caller does not want to
350    /// persist the payload at all).
351    #[allow(clippy::too_many_arguments)]
352    pub fn new(
353        handle: impl Into<String>,
354        session_id: Option<String>,
355        initiator: ResumeInitiator,
356        initiator_id: impl Into<String>,
357        original_input: Option<&JsonValue>,
358        journaled_input: Option<JsonValue>,
359        continue_transcript: bool,
360        linked_suspension_span_id: Option<String>,
361        trigger_match: Option<TriggerMatchInfo>,
362    ) -> Self {
363        let handle = handle.into();
364        let initiator_id = initiator_id.into();
365        let resumed_at =
366            SignedLifecycleTimestamp::now_for(RESUMPTION_RECEIPT_KIND, &handle, &initiator_id);
367        let input_hash = hash_resume_input(original_input);
368        Self {
369            handle,
370            session_id,
371            initiator,
372            initiator_id,
373            input: journaled_input,
374            input_hash,
375            continue_transcript,
376            linked_suspension_span_id,
377            trigger_match,
378            resumed_at,
379        }
380    }
381
382    pub fn verify_signature(&self) -> Result<(), LifecycleReceiptError> {
383        verify_signed_timestamp(
384            &self.resumed_at,
385            RESUMPTION_RECEIPT_KIND,
386            &self.handle,
387            &self.initiator_id,
388        )
389    }
390}
391
392/// Persisted drain decision. Recorded by the settlement agent so replay
393/// can short-circuit the LLM call that produced `action`.
394#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
395pub struct DrainDecisionReceipt {
396    pub pipeline_id: String,
397    pub item: DrainItem,
398    pub action: DrainAction,
399    pub reason: String,
400    pub decided_by: String,
401    pub decided_at: SignedLifecycleTimestamp,
402    #[serde(default, skip_serializing_if = "Option::is_none")]
403    pub prompt_hash: Option<String>,
404}
405
406impl DrainDecisionReceipt {
407    pub fn new(
408        pipeline_id: impl Into<String>,
409        item: DrainItem,
410        action: DrainAction,
411        reason: impl Into<String>,
412        decided_by: impl Into<String>,
413        prompt_hash: Option<String>,
414    ) -> Self {
415        let pipeline_id = pipeline_id.into();
416        let decided_by = decided_by.into();
417        let decided_at = SignedLifecycleTimestamp::now_for(
418            DRAIN_DECISION_RECEIPT_KIND,
419            &pipeline_id,
420            &decided_by,
421        );
422        Self {
423            pipeline_id,
424            item,
425            action,
426            reason: reason.into(),
427            decided_by,
428            decided_at,
429            prompt_hash,
430        }
431    }
432
433    pub fn verify_signature(&self) -> Result<(), LifecycleReceiptError> {
434        verify_signed_timestamp(
435            &self.decided_at,
436            DRAIN_DECISION_RECEIPT_KIND,
437            &self.pipeline_id,
438            &self.decided_by,
439        )
440    }
441}
442
443/// Errors surfaced when minting / verifying / replaying receipts.
444#[derive(Debug, Clone, PartialEq, Eq)]
445pub enum LifecycleReceiptError {
446    SignatureAlgorithmMismatch {
447        expected: String,
448        found: String,
449    },
450    SignatureKeyMismatch {
451        expected: String,
452        found: String,
453    },
454    SignatureMismatch {
455        expected: String,
456        found: String,
457    },
458    ResumeInputHashMismatch {
459        handle: String,
460        expected_hash: String,
461        actual_hash: String,
462    },
463    DrainDecisionPromptHashMismatch {
464        item_id: String,
465        expected_hash: String,
466        actual_hash: String,
467    },
468    Persistence(String),
469}
470
471impl std::fmt::Display for LifecycleReceiptError {
472    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
473        match self {
474            Self::SignatureAlgorithmMismatch { expected, found } => write!(
475                f,
476                "HARN-SUS-013 lifecycle signature algorithm mismatch (expected {expected}, found {found})"
477            ),
478            Self::SignatureKeyMismatch { expected, found } => write!(
479                f,
480                "HARN-SUS-013 lifecycle signature key mismatch (expected {expected}, found {found})"
481            ),
482            Self::SignatureMismatch { expected, found } => write!(
483                f,
484                "HARN-SUS-013 lifecycle signature mismatch (expected {expected}, found {found})"
485            ),
486            Self::ResumeInputHashMismatch {
487                handle,
488                expected_hash,
489                actual_hash,
490            } => write!(
491                f,
492                "HARN-SUS-011 replay resume input hash mismatch for {handle} (expected {expected_hash}, got {actual_hash})"
493            ),
494            Self::DrainDecisionPromptHashMismatch {
495                item_id,
496                expected_hash,
497                actual_hash,
498            } => write!(
499                f,
500                "HARN-SUS-012 replay drain decision prompt hash mismatch for {item_id} (expected {expected_hash}, got {actual_hash})"
501            ),
502            Self::Persistence(message) => write!(f, "lifecycle receipt persistence: {message}"),
503        }
504    }
505}
506
507impl std::error::Error for LifecycleReceiptError {}
508
509/// Deterministic hash used by [`ResumptionReceipt::input_hash`] and
510/// the replay-oracle comparator. We hash canonical JSON so map-key order
511/// doesn't drift between record and replay.
512pub fn hash_resume_input(input: Option<&JsonValue>) -> String {
513    let canonical = canonical_json_bytes(input.unwrap_or(&JsonValue::Null));
514    let digest = Sha256::digest(&canonical);
515    format!("sha256:{}", hex::encode(digest))
516}
517
518/// Hash a settlement-agent prompt fingerprint. The settlement agent
519/// records this before its LLM call so replay can detect prompt drift
520/// without re-running the model.
521pub fn hash_drain_decision_prompt(prompt: &str) -> String {
522    let digest = Sha256::digest(prompt.as_bytes());
523    format!("sha256:{}", hex::encode(digest))
524}
525
526fn canonical_json_bytes(value: &JsonValue) -> Vec<u8> {
527    let canonical = canonicalize_for_hash(value);
528    serde_json::to_vec(&canonical).unwrap_or_default()
529}
530
531fn canonicalize_for_hash(value: &JsonValue) -> JsonValue {
532    match value {
533        JsonValue::Object(map) => {
534            let mut sorted = serde_json::Map::new();
535            let mut keys: Vec<&String> = map.keys().collect();
536            keys.sort();
537            for key in keys {
538                if let Some(v) = map.get(key) {
539                    sorted.insert(key.clone(), canonicalize_for_hash(v));
540                }
541            }
542            JsonValue::Object(sorted)
543        }
544        JsonValue::Array(items) => {
545            JsonValue::Array(items.iter().map(canonicalize_for_hash).collect())
546        }
547        other => other.clone(),
548    }
549}
550
551/// Lightweight policy for redacting persisted `ResumptionReceipt.input`
552/// before journaling. Each entry is a JSON-pointer path that gets
553/// replaced with a sentinel `{"$harn_redacted": "<reason>"}` object.
554/// The hash is always computed against the *unredacted* original payload
555/// so determinism survives redaction.
556#[derive(Clone, Debug, Default)]
557pub struct RedactionPolicy {
558    pub paths: Vec<RedactionPath>,
559}
560
561#[derive(Clone, Debug)]
562pub struct RedactionPath {
563    pub pointer: String,
564    pub reason: String,
565}
566
567impl RedactionPolicy {
568    pub fn redact(&self, value: &JsonValue) -> JsonValue {
569        let mut working = value.clone();
570        for rule in &self.paths {
571            apply_redaction(&mut working, &rule.pointer, &rule.reason);
572        }
573        working
574    }
575}
576
577fn apply_redaction(value: &mut JsonValue, pointer: &str, reason: &str) {
578    if pointer.is_empty() || pointer == "/" {
579        *value = serde_json::json!({"$harn_redacted": reason});
580        return;
581    }
582    let segments: Vec<&str> = pointer
583        .strip_prefix('/')
584        .unwrap_or(pointer)
585        .split('/')
586        .collect();
587    redact_segments(value, &segments, reason);
588}
589
590fn redact_segments(value: &mut JsonValue, segments: &[&str], reason: &str) {
591    if segments.is_empty() {
592        *value = serde_json::json!({"$harn_redacted": reason});
593        return;
594    }
595    let head = segments[0];
596    let tail = &segments[1..];
597    match value {
598        JsonValue::Object(map) => {
599            if let Some(child) = map.get_mut(head) {
600                redact_segments(child, tail, reason);
601            }
602        }
603        JsonValue::Array(items) => {
604            if let Ok(index) = head.parse::<usize>() {
605                if let Some(child) = items.get_mut(index) {
606                    redact_segments(child, tail, reason);
607                }
608            }
609        }
610        _ => {}
611    }
612}
613
614thread_local! {
615    static LIFECYCLE_RECEIPT_LOG: RefCell<Vec<LifecycleReceiptEntry>> = const { RefCell::new(Vec::new()) };
616    static LIFECYCLE_RECEIPT_SEQ: RefCell<u64> = const { RefCell::new(0) };
617}
618
619/// Reset the in-memory receipt registry. Called from `reset_thread_local_state`
620/// so test harnesses do not carry receipts between runs.
621pub fn reset_lifecycle_receipt_registry() {
622    LIFECYCLE_RECEIPT_LOG.with(|log| log.borrow_mut().clear());
623    LIFECYCLE_RECEIPT_SEQ.with(|seq| *seq.borrow_mut() = 0);
624}
625
626/// A receipt + its monotonic seq + its on-disk kind. The seq is stable
627/// across recordings within a single thread, which is what conformance
628/// fixtures need for deterministic byte-identical comparison.
629#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
630pub struct LifecycleReceiptEntry {
631    pub seq: u64,
632    pub kind: String,
633    pub payload: JsonValue,
634}
635
636impl LifecycleReceiptEntry {
637    pub fn to_json(&self) -> JsonValue {
638        serde_json::json!({
639            "seq": self.seq,
640            "kind": &self.kind,
641            "payload": &self.payload,
642        })
643    }
644}
645
646fn next_seq() -> u64 {
647    LIFECYCLE_RECEIPT_SEQ.with(|seq| {
648        let mut slot = seq.borrow_mut();
649        *slot += 1;
650        *slot
651    })
652}
653
654fn record_entry(kind: &str, payload: JsonValue) -> LifecycleReceiptEntry {
655    let entry = LifecycleReceiptEntry {
656        seq: next_seq(),
657        kind: kind.to_string(),
658        payload,
659    };
660    LIFECYCLE_RECEIPT_LOG.with(|log| log.borrow_mut().push(entry.clone()));
661    persist_entry(&entry);
662    entry
663}
664
665fn persist_entry(entry: &LifecycleReceiptEntry) {
666    let Some(log) = active_event_log() else {
667        return;
668    };
669    let Ok(topic) = Topic::new(LIFECYCLE_RECEIPT_TOPIC) else {
670        return;
671    };
672    let mut headers = BTreeMap::new();
673    headers.insert("kind".to_string(), entry.kind.clone());
674    headers.insert("seq".to_string(), entry.seq.to_string());
675    let event = LogEvent::new(entry.kind.clone(), entry.payload.clone()).with_headers(headers);
676    let _ = futures::executor::block_on(log.append(&topic, event));
677}
678
679/// Persist a suspension receipt and return the journaled entry.
680pub fn record_suspension_receipt(receipt: &SuspensionReceipt) -> LifecycleReceiptEntry {
681    let payload = serde_json::to_value(receipt).unwrap_or(JsonValue::Null);
682    record_entry(SUSPENSION_RECEIPT_KIND, payload)
683}
684
685/// Persist a resumption receipt and return the journaled entry.
686pub fn record_resumption_receipt(receipt: &ResumptionReceipt) -> LifecycleReceiptEntry {
687    let payload = serde_json::to_value(receipt).unwrap_or(JsonValue::Null);
688    record_entry(RESUMPTION_RECEIPT_KIND, payload)
689}
690
691/// Persist a drain decision receipt and return the journaled entry.
692pub fn record_drain_decision_receipt(receipt: &DrainDecisionReceipt) -> LifecycleReceiptEntry {
693    let payload = serde_json::to_value(receipt).unwrap_or(JsonValue::Null);
694    record_entry(DRAIN_DECISION_RECEIPT_KIND, payload)
695}
696
697/// Snapshot every recorded receipt in seq order. Used by the replay
698/// oracle (and conformance fixtures) to read back the journal.
699pub fn lifecycle_receipts_snapshot() -> Vec<LifecycleReceiptEntry> {
700    LIFECYCLE_RECEIPT_LOG.with(|log| log.borrow().clone())
701}
702
703/// Verify the [`ResumptionReceipt`] input hash matches a fresh
704/// candidate. Used during replay: the runtime computes the hash of the
705/// resume input it would otherwise apply and asks the receipt to
706/// confirm. Returns the cached input on success so the caller can feed
707/// it back into the suspended worker.
708pub fn replay_resume_input(
709    receipt: &ResumptionReceipt,
710    candidate_input: Option<&JsonValue>,
711) -> Result<Option<JsonValue>, LifecycleReceiptError> {
712    receipt.verify_signature()?;
713    let actual = hash_resume_input(candidate_input);
714    if actual != receipt.input_hash {
715        return Err(LifecycleReceiptError::ResumeInputHashMismatch {
716            handle: receipt.handle.clone(),
717            expected_hash: receipt.input_hash.clone(),
718            actual_hash: actual,
719        });
720    }
721    Ok(receipt.input.clone())
722}
723
724/// Verify the cached drain decision prompt matches the candidate
725/// settlement-agent prompt at replay time. Returns the recorded
726/// decision so the replay path can skip re-spawning the agent.
727pub fn replay_drain_decision(
728    receipt: &DrainDecisionReceipt,
729    candidate_prompt: Option<&str>,
730) -> Result<DrainAction, LifecycleReceiptError> {
731    receipt.verify_signature()?;
732    if let (Some(prompt), Some(expected)) = (candidate_prompt, receipt.prompt_hash.as_ref()) {
733        let actual = hash_drain_decision_prompt(prompt);
734        if &actual != expected {
735            return Err(LifecycleReceiptError::DrainDecisionPromptHashMismatch {
736                item_id: receipt.item.id.clone(),
737                expected_hash: expected.clone(),
738                actual_hash: actual,
739            });
740        }
741    }
742    Ok(receipt.action)
743}
744
745#[cfg(test)]
746mod tests {
747    use super::*;
748    use serde_json::json;
749
750    fn fresh() {
751        reset_lifecycle_receipt_registry();
752    }
753
754    #[test]
755    fn suspension_receipt_signs_and_verifies() {
756        fresh();
757        let receipt = SuspensionReceipt::new(
758            "worker://triage/42",
759            Some("session-1".to_string()),
760            SuspendInitiator::Operator,
761            "operator-1",
762            "waiting for human approval",
763            Some(json!({"kind": "approval"})),
764            Some("span-1".to_string()),
765        );
766        receipt
767            .verify_signature()
768            .expect("signed timestamp verifies");
769
770        let mut tampered = receipt.clone();
771        tampered.suspended_at.at_ms += 1;
772        assert!(matches!(
773            tampered.verify_signature(),
774            Err(LifecycleReceiptError::SignatureMismatch { .. })
775        ));
776    }
777
778    #[test]
779    fn resumption_receipt_round_trips_input_hash() {
780        fresh();
781        let original = json!({"approved": true, "comment": "ship it"});
782        let receipt = ResumptionReceipt::new(
783            "worker://triage/42",
784            Some("session-1".to_string()),
785            ResumeInitiator::Operator,
786            "operator-1",
787            Some(&original),
788            Some(original.clone()),
789            true,
790            None,
791            None,
792        );
793
794        let cached = replay_resume_input(&receipt, Some(&original)).expect("matches");
795        assert_eq!(cached, Some(original.clone()));
796
797        let drift = json!({"approved": false, "comment": "ship it"});
798        let mismatch = replay_resume_input(&receipt, Some(&drift));
799        assert!(matches!(
800            mismatch,
801            Err(LifecycleReceiptError::ResumeInputHashMismatch { .. })
802        ));
803    }
804
805    #[test]
806    fn resumption_hash_is_canonical_across_map_key_order() {
807        let a = json!({"a": 1, "b": 2});
808        let b = json!({"b": 2, "a": 1});
809        assert_eq!(hash_resume_input(Some(&a)), hash_resume_input(Some(&b)));
810    }
811
812    #[test]
813    fn redaction_policy_preserves_hash() {
814        let original = json!({
815            "user": "alice",
816            "secret_token": "very-secret",
817            "approved": true,
818        });
819        let policy = RedactionPolicy {
820            paths: vec![RedactionPath {
821                pointer: "/secret_token".to_string(),
822                reason: "auth_token".to_string(),
823            }],
824        };
825        let redacted = policy.redact(&original);
826        assert_ne!(redacted, original);
827        assert_eq!(
828            redacted["secret_token"],
829            json!({"$harn_redacted": "auth_token"})
830        );
831        // Hash is computed against the original — replay still works.
832        let receipt = ResumptionReceipt::new(
833            "worker://x",
834            None,
835            ResumeInitiator::Operator,
836            "op-1",
837            Some(&original),
838            Some(redacted.clone()),
839            true,
840            None,
841            None,
842        );
843        let cached = replay_resume_input(&receipt, Some(&original)).expect("matches");
844        assert_eq!(cached, Some(redacted));
845    }
846
847    #[test]
848    fn drain_decision_receipt_memoizes_action() {
849        fresh();
850        let prompt = "settle this drain item";
851        let receipt = DrainDecisionReceipt::new(
852            "pipeline-1",
853            DrainItem {
854                category: DrainItemCategory::SuspendedSubagent,
855                id: "worker://triage/42".to_string(),
856                summary: "worker is suspended".to_string(),
857            },
858            DrainAction::Resume,
859            "settlement agent picked resume".to_string(),
860            "settlement-session-1",
861            Some(hash_drain_decision_prompt(prompt)),
862        );
863
864        let action = replay_drain_decision(&receipt, Some(prompt)).expect("matches");
865        assert_eq!(action, DrainAction::Resume);
866
867        let drift = replay_drain_decision(&receipt, Some("a different prompt"));
868        assert!(matches!(
869            drift,
870            Err(LifecycleReceiptError::DrainDecisionPromptHashMismatch { .. })
871        ));
872    }
873
874    #[test]
875    fn record_then_snapshot_byte_identical_across_runs() {
876        fresh();
877        let s = SuspensionReceipt::new(
878            "worker://x",
879            None,
880            SuspendInitiator::Operator,
881            "op-1",
882            "reason",
883            None,
884            None,
885        );
886        let entry_a = record_suspension_receipt(&s);
887        fresh();
888        let entry_b = record_suspension_receipt(&s);
889        // Both runs assign seq=1 after reset; payloads differ only by the
890        // resigned `at_ms` window. The deterministic *contract* is that a
891        // single recorded receipt round-trips byte-identical through the
892        // log — that's the case the replay oracle relies on.
893        assert_eq!(entry_a.seq, entry_b.seq);
894        assert_eq!(entry_a.kind, entry_b.kind);
895    }
896
897    #[test]
898    fn snapshot_returns_recorded_entries_in_seq_order() {
899        fresh();
900        let s = SuspensionReceipt::new(
901            "worker://x",
902            None,
903            SuspendInitiator::Operator,
904            "op-1",
905            "reason",
906            None,
907            None,
908        );
909        record_suspension_receipt(&s);
910        let r = ResumptionReceipt::new(
911            "worker://x",
912            None,
913            ResumeInitiator::Operator,
914            "op-1",
915            None,
916            None,
917            true,
918            None,
919            None,
920        );
921        record_resumption_receipt(&r);
922        let snapshot = lifecycle_receipts_snapshot();
923        assert_eq!(snapshot.len(), 2);
924        assert_eq!(snapshot[0].seq, 1);
925        assert_eq!(snapshot[0].kind, SUSPENSION_RECEIPT_KIND);
926        assert_eq!(snapshot[1].seq, 2);
927        assert_eq!(snapshot[1].kind, RESUMPTION_RECEIPT_KIND);
928    }
929}