use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::OnceLock;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use sha2::{Digest, Sha256};
use time::format_description::well_known::Rfc3339;
use crate::event_log::{active_event_log, EventLog, LogEvent, Topic};
pub const LIFECYCLE_RECEIPT_TOPIC: &str = "agent.lifecycle.receipts";
pub const SUSPENSION_RECEIPT_KIND: &str = "suspension_receipt";
pub const RESUMPTION_RECEIPT_KIND: &str = "resumption_receipt";
pub const DRAIN_DECISION_RECEIPT_KIND: &str = "drain_decision_receipt";
pub const SIGNED_TIMESTAMP_ALGORITHM: &str = "hmac-sha256";
pub const SIGNED_TIMESTAMP_KEY_ID: &str = "local-session";
static LIFECYCLE_SIGNING_SALT: OnceLock<Vec<u8>> = OnceLock::new();
fn lifecycle_signing_salt() -> &'static [u8] {
LIFECYCLE_SIGNING_SALT
.get_or_init(|| {
format!(
"harn-lifecycle-signing-salt:{}:{}",
std::process::id(),
uuid::Uuid::now_v7()
)
.into_bytes()
})
.as_slice()
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SignedLifecycleTimestamp {
pub at_ms: i64,
pub at: String,
pub algorithm: String,
pub key_id: String,
pub signature: String,
}
impl SignedLifecycleTimestamp {
pub fn now_for(kind: &str, subject_id: &str, initiator_id: &str) -> Self {
let at = crate::clock_mock::now_utc();
let at_ms = (at.unix_timestamp_nanos() / 1_000_000) as i64;
let at_text = at.format(&Rfc3339).unwrap_or_else(|_| at.to_string());
let signature = sign_timestamp_material(kind, at_ms, subject_id, initiator_id);
Self {
at_ms,
at: at_text,
algorithm: SIGNED_TIMESTAMP_ALGORITHM.to_string(),
key_id: SIGNED_TIMESTAMP_KEY_ID.to_string(),
signature,
}
}
}
fn sign_timestamp_material(kind: &str, at_ms: i64, subject_id: &str, initiator_id: &str) -> String {
let material = format!(
"harn.lifecycle.timestamp.v1\nkind={kind}\nat_ms={at_ms}\nsubject={subject_id}\ninitiator={initiator_id}\n"
);
let mac = crate::connectors::hmac::hmac_sha256(lifecycle_signing_salt(), material.as_bytes());
format!("sha256:{}", hex::encode(mac))
}
pub fn verify_signed_timestamp(
stamp: &SignedLifecycleTimestamp,
kind: &str,
subject_id: &str,
initiator_id: &str,
) -> Result<(), LifecycleReceiptError> {
if stamp.algorithm != SIGNED_TIMESTAMP_ALGORITHM {
return Err(LifecycleReceiptError::SignatureAlgorithmMismatch {
expected: SIGNED_TIMESTAMP_ALGORITHM.to_string(),
found: stamp.algorithm.clone(),
});
}
if stamp.key_id != SIGNED_TIMESTAMP_KEY_ID {
return Err(LifecycleReceiptError::SignatureKeyMismatch {
expected: SIGNED_TIMESTAMP_KEY_ID.to_string(),
found: stamp.key_id.clone(),
});
}
let expected = sign_timestamp_material(kind, stamp.at_ms, subject_id, initiator_id);
if expected != stamp.signature {
return Err(LifecycleReceiptError::SignatureMismatch {
expected,
found: stamp.signature.clone(),
});
}
Ok(())
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SuspendInitiator {
#[serde(rename = "self")]
SelfInitiated,
Parent,
#[default]
Operator,
Triggered,
}
impl SuspendInitiator {
pub fn as_str(self) -> &'static str {
match self {
Self::SelfInitiated => "self",
Self::Parent => "parent",
Self::Operator => "operator",
Self::Triggered => "triggered",
}
}
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ResumeInitiator {
#[default]
Parent,
Operator,
Triggered,
DrainAgent,
Timeout,
}
impl ResumeInitiator {
pub fn as_str(self) -> &'static str {
match self {
Self::Parent => "parent",
Self::Operator => "operator",
Self::Triggered => "triggered",
Self::DrainAgent => "drain_agent",
Self::Timeout => "timeout",
}
}
pub fn parse(value: &str) -> Self {
match value.trim() {
"operator" => Self::Operator,
"triggered" | "trigger" => Self::Triggered,
"drain_agent" | "drain-agent" | "settlement" => Self::DrainAgent,
"timeout" => Self::Timeout,
_ => Self::Parent,
}
}
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct TriggerMatchInfo {
pub source: String,
pub event_id: String,
pub filter_summary: String,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum DrainItemCategory {
#[default]
SuspendedSubagent,
QueuedTrigger,
PartialHandoff,
InFlightLlmCall,
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
pub struct DrainItem {
pub category: DrainItemCategory,
pub id: String,
pub summary: String,
}
#[derive(Clone, Copy, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum DrainAction {
#[default]
Resume,
Cancel,
Handoff,
Acknowledge,
Defer,
Wait,
Finalize,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SuspensionReceipt {
pub handle: String,
pub session_id: Option<String>,
pub initiator: SuspendInitiator,
pub initiator_id: String,
pub reason: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub conditions: Option<JsonValue>,
pub suspended_at: SignedLifecycleTimestamp,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub span_id: Option<String>,
}
impl SuspensionReceipt {
pub fn new(
handle: impl Into<String>,
session_id: Option<String>,
initiator: SuspendInitiator,
initiator_id: impl Into<String>,
reason: impl Into<String>,
conditions: Option<JsonValue>,
span_id: Option<String>,
) -> Self {
let handle = handle.into();
let initiator_id = initiator_id.into();
let suspended_at =
SignedLifecycleTimestamp::now_for(SUSPENSION_RECEIPT_KIND, &handle, &initiator_id);
Self {
handle,
session_id,
initiator,
initiator_id,
reason: reason.into(),
conditions,
suspended_at,
span_id,
}
}
pub fn verify_signature(&self) -> Result<(), LifecycleReceiptError> {
verify_signed_timestamp(
&self.suspended_at,
SUSPENSION_RECEIPT_KIND,
&self.handle,
&self.initiator_id,
)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ResumptionReceipt {
pub handle: String,
pub session_id: Option<String>,
pub initiator: ResumeInitiator,
pub initiator_id: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub input: Option<JsonValue>,
pub input_hash: String,
pub continue_transcript: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub linked_suspension_span_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub trigger_match: Option<TriggerMatchInfo>,
pub resumed_at: SignedLifecycleTimestamp,
}
impl ResumptionReceipt {
#[allow(clippy::too_many_arguments)]
pub fn new(
handle: impl Into<String>,
session_id: Option<String>,
initiator: ResumeInitiator,
initiator_id: impl Into<String>,
original_input: Option<&JsonValue>,
journaled_input: Option<JsonValue>,
continue_transcript: bool,
linked_suspension_span_id: Option<String>,
trigger_match: Option<TriggerMatchInfo>,
) -> Self {
let handle = handle.into();
let initiator_id = initiator_id.into();
let resumed_at =
SignedLifecycleTimestamp::now_for(RESUMPTION_RECEIPT_KIND, &handle, &initiator_id);
let input_hash = hash_resume_input(original_input);
Self {
handle,
session_id,
initiator,
initiator_id,
input: journaled_input,
input_hash,
continue_transcript,
linked_suspension_span_id,
trigger_match,
resumed_at,
}
}
pub fn verify_signature(&self) -> Result<(), LifecycleReceiptError> {
verify_signed_timestamp(
&self.resumed_at,
RESUMPTION_RECEIPT_KIND,
&self.handle,
&self.initiator_id,
)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct DrainDecisionReceipt {
pub pipeline_id: String,
pub item: DrainItem,
pub action: DrainAction,
pub reason: String,
pub decided_by: String,
pub decided_at: SignedLifecycleTimestamp,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub prompt_hash: Option<String>,
}
impl DrainDecisionReceipt {
pub fn new(
pipeline_id: impl Into<String>,
item: DrainItem,
action: DrainAction,
reason: impl Into<String>,
decided_by: impl Into<String>,
prompt_hash: Option<String>,
) -> Self {
let pipeline_id = pipeline_id.into();
let decided_by = decided_by.into();
let decided_at = SignedLifecycleTimestamp::now_for(
DRAIN_DECISION_RECEIPT_KIND,
&pipeline_id,
&decided_by,
);
Self {
pipeline_id,
item,
action,
reason: reason.into(),
decided_by,
decided_at,
prompt_hash,
}
}
pub fn verify_signature(&self) -> Result<(), LifecycleReceiptError> {
verify_signed_timestamp(
&self.decided_at,
DRAIN_DECISION_RECEIPT_KIND,
&self.pipeline_id,
&self.decided_by,
)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LifecycleReceiptError {
SignatureAlgorithmMismatch {
expected: String,
found: String,
},
SignatureKeyMismatch {
expected: String,
found: String,
},
SignatureMismatch {
expected: String,
found: String,
},
ResumeInputHashMismatch {
handle: String,
expected_hash: String,
actual_hash: String,
},
DrainDecisionPromptHashMismatch {
item_id: String,
expected_hash: String,
actual_hash: String,
},
Persistence(String),
}
impl std::fmt::Display for LifecycleReceiptError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SignatureAlgorithmMismatch { expected, found } => write!(
f,
"HARN-SUS-013 lifecycle signature algorithm mismatch (expected {expected}, found {found})"
),
Self::SignatureKeyMismatch { expected, found } => write!(
f,
"HARN-SUS-013 lifecycle signature key mismatch (expected {expected}, found {found})"
),
Self::SignatureMismatch { expected, found } => write!(
f,
"HARN-SUS-013 lifecycle signature mismatch (expected {expected}, found {found})"
),
Self::ResumeInputHashMismatch {
handle,
expected_hash,
actual_hash,
} => write!(
f,
"HARN-SUS-011 replay resume input hash mismatch for {handle} (expected {expected_hash}, got {actual_hash})"
),
Self::DrainDecisionPromptHashMismatch {
item_id,
expected_hash,
actual_hash,
} => write!(
f,
"HARN-SUS-012 replay drain decision prompt hash mismatch for {item_id} (expected {expected_hash}, got {actual_hash})"
),
Self::Persistence(message) => write!(f, "lifecycle receipt persistence: {message}"),
}
}
}
impl std::error::Error for LifecycleReceiptError {}
pub fn hash_resume_input(input: Option<&JsonValue>) -> String {
let canonical = canonical_json_bytes(input.unwrap_or(&JsonValue::Null));
let digest = Sha256::digest(&canonical);
format!("sha256:{}", hex::encode(digest))
}
pub fn hash_drain_decision_prompt(prompt: &str) -> String {
let digest = Sha256::digest(prompt.as_bytes());
format!("sha256:{}", hex::encode(digest))
}
fn canonical_json_bytes(value: &JsonValue) -> Vec<u8> {
let canonical = canonicalize_for_hash(value);
serde_json::to_vec(&canonical).unwrap_or_default()
}
fn canonicalize_for_hash(value: &JsonValue) -> JsonValue {
match value {
JsonValue::Object(map) => {
let mut sorted = serde_json::Map::new();
let mut keys: Vec<&String> = map.keys().collect();
keys.sort();
for key in keys {
if let Some(v) = map.get(key) {
sorted.insert(key.clone(), canonicalize_for_hash(v));
}
}
JsonValue::Object(sorted)
}
JsonValue::Array(items) => {
JsonValue::Array(items.iter().map(canonicalize_for_hash).collect())
}
other => other.clone(),
}
}
#[derive(Clone, Debug, Default)]
pub struct RedactionPolicy {
pub paths: Vec<RedactionPath>,
}
#[derive(Clone, Debug)]
pub struct RedactionPath {
pub pointer: String,
pub reason: String,
}
impl RedactionPolicy {
pub fn redact(&self, value: &JsonValue) -> JsonValue {
let mut working = value.clone();
for rule in &self.paths {
apply_redaction(&mut working, &rule.pointer, &rule.reason);
}
working
}
}
fn apply_redaction(value: &mut JsonValue, pointer: &str, reason: &str) {
if pointer.is_empty() || pointer == "/" {
*value = serde_json::json!({"$harn_redacted": reason});
return;
}
let segments: Vec<&str> = pointer
.strip_prefix('/')
.unwrap_or(pointer)
.split('/')
.collect();
redact_segments(value, &segments, reason);
}
fn redact_segments(value: &mut JsonValue, segments: &[&str], reason: &str) {
if segments.is_empty() {
*value = serde_json::json!({"$harn_redacted": reason});
return;
}
let head = segments[0];
let tail = &segments[1..];
match value {
JsonValue::Object(map) => {
if let Some(child) = map.get_mut(head) {
redact_segments(child, tail, reason);
}
}
JsonValue::Array(items) => {
if let Ok(index) = head.parse::<usize>() {
if let Some(child) = items.get_mut(index) {
redact_segments(child, tail, reason);
}
}
}
_ => {}
}
}
thread_local! {
static LIFECYCLE_RECEIPT_LOG: RefCell<Vec<LifecycleReceiptEntry>> = const { RefCell::new(Vec::new()) };
static LIFECYCLE_RECEIPT_SEQ: RefCell<u64> = const { RefCell::new(0) };
}
pub fn reset_lifecycle_receipt_registry() {
LIFECYCLE_RECEIPT_LOG.with(|log| log.borrow_mut().clear());
LIFECYCLE_RECEIPT_SEQ.with(|seq| *seq.borrow_mut() = 0);
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct LifecycleReceiptEntry {
pub seq: u64,
pub kind: String,
pub payload: JsonValue,
}
impl LifecycleReceiptEntry {
pub fn to_json(&self) -> JsonValue {
serde_json::json!({
"seq": self.seq,
"kind": &self.kind,
"payload": &self.payload,
})
}
}
fn next_seq() -> u64 {
LIFECYCLE_RECEIPT_SEQ.with(|seq| {
let mut slot = seq.borrow_mut();
*slot += 1;
*slot
})
}
fn record_entry(kind: &str, payload: JsonValue) -> LifecycleReceiptEntry {
let entry = LifecycleReceiptEntry {
seq: next_seq(),
kind: kind.to_string(),
payload,
};
LIFECYCLE_RECEIPT_LOG.with(|log| log.borrow_mut().push(entry.clone()));
persist_entry(&entry);
entry
}
fn persist_entry(entry: &LifecycleReceiptEntry) {
let Some(log) = active_event_log() else {
return;
};
let Ok(topic) = Topic::new(LIFECYCLE_RECEIPT_TOPIC) else {
return;
};
let mut headers = BTreeMap::new();
headers.insert("kind".to_string(), entry.kind.clone());
headers.insert("seq".to_string(), entry.seq.to_string());
let event = LogEvent::new(entry.kind.clone(), entry.payload.clone()).with_headers(headers);
let _ = futures::executor::block_on(log.append(&topic, event));
}
pub fn record_suspension_receipt(receipt: &SuspensionReceipt) -> LifecycleReceiptEntry {
let payload = serde_json::to_value(receipt).unwrap_or(JsonValue::Null);
record_entry(SUSPENSION_RECEIPT_KIND, payload)
}
pub fn record_resumption_receipt(receipt: &ResumptionReceipt) -> LifecycleReceiptEntry {
let payload = serde_json::to_value(receipt).unwrap_or(JsonValue::Null);
record_entry(RESUMPTION_RECEIPT_KIND, payload)
}
pub fn record_drain_decision_receipt(receipt: &DrainDecisionReceipt) -> LifecycleReceiptEntry {
let payload = serde_json::to_value(receipt).unwrap_or(JsonValue::Null);
record_entry(DRAIN_DECISION_RECEIPT_KIND, payload)
}
pub fn lifecycle_receipts_snapshot() -> Vec<LifecycleReceiptEntry> {
LIFECYCLE_RECEIPT_LOG.with(|log| log.borrow().clone())
}
pub fn replay_resume_input(
receipt: &ResumptionReceipt,
candidate_input: Option<&JsonValue>,
) -> Result<Option<JsonValue>, LifecycleReceiptError> {
receipt.verify_signature()?;
let actual = hash_resume_input(candidate_input);
if actual != receipt.input_hash {
return Err(LifecycleReceiptError::ResumeInputHashMismatch {
handle: receipt.handle.clone(),
expected_hash: receipt.input_hash.clone(),
actual_hash: actual,
});
}
Ok(receipt.input.clone())
}
pub fn replay_drain_decision(
receipt: &DrainDecisionReceipt,
candidate_prompt: Option<&str>,
) -> Result<DrainAction, LifecycleReceiptError> {
receipt.verify_signature()?;
if let (Some(prompt), Some(expected)) = (candidate_prompt, receipt.prompt_hash.as_ref()) {
let actual = hash_drain_decision_prompt(prompt);
if &actual != expected {
return Err(LifecycleReceiptError::DrainDecisionPromptHashMismatch {
item_id: receipt.item.id.clone(),
expected_hash: expected.clone(),
actual_hash: actual,
});
}
}
Ok(receipt.action)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn fresh() {
reset_lifecycle_receipt_registry();
}
#[test]
fn suspension_receipt_signs_and_verifies() {
fresh();
let receipt = SuspensionReceipt::new(
"worker://triage/42",
Some("session-1".to_string()),
SuspendInitiator::Operator,
"operator-1",
"waiting for human approval",
Some(json!({"kind": "approval"})),
Some("span-1".to_string()),
);
receipt
.verify_signature()
.expect("signed timestamp verifies");
let mut tampered = receipt.clone();
tampered.suspended_at.at_ms += 1;
assert!(matches!(
tampered.verify_signature(),
Err(LifecycleReceiptError::SignatureMismatch { .. })
));
}
#[test]
fn resumption_receipt_round_trips_input_hash() {
fresh();
let original = json!({"approved": true, "comment": "ship it"});
let receipt = ResumptionReceipt::new(
"worker://triage/42",
Some("session-1".to_string()),
ResumeInitiator::Operator,
"operator-1",
Some(&original),
Some(original.clone()),
true,
None,
None,
);
let cached = replay_resume_input(&receipt, Some(&original)).expect("matches");
assert_eq!(cached, Some(original.clone()));
let drift = json!({"approved": false, "comment": "ship it"});
let mismatch = replay_resume_input(&receipt, Some(&drift));
assert!(matches!(
mismatch,
Err(LifecycleReceiptError::ResumeInputHashMismatch { .. })
));
}
#[test]
fn resumption_hash_is_canonical_across_map_key_order() {
let a = json!({"a": 1, "b": 2});
let b = json!({"b": 2, "a": 1});
assert_eq!(hash_resume_input(Some(&a)), hash_resume_input(Some(&b)));
}
#[test]
fn redaction_policy_preserves_hash() {
let original = json!({
"user": "alice",
"secret_token": "very-secret",
"approved": true,
});
let policy = RedactionPolicy {
paths: vec![RedactionPath {
pointer: "/secret_token".to_string(),
reason: "auth_token".to_string(),
}],
};
let redacted = policy.redact(&original);
assert_ne!(redacted, original);
assert_eq!(
redacted["secret_token"],
json!({"$harn_redacted": "auth_token"})
);
let receipt = ResumptionReceipt::new(
"worker://x",
None,
ResumeInitiator::Operator,
"op-1",
Some(&original),
Some(redacted.clone()),
true,
None,
None,
);
let cached = replay_resume_input(&receipt, Some(&original)).expect("matches");
assert_eq!(cached, Some(redacted));
}
#[test]
fn drain_decision_receipt_memoizes_action() {
fresh();
let prompt = "settle this drain item";
let receipt = DrainDecisionReceipt::new(
"pipeline-1",
DrainItem {
category: DrainItemCategory::SuspendedSubagent,
id: "worker://triage/42".to_string(),
summary: "worker is suspended".to_string(),
},
DrainAction::Resume,
"settlement agent picked resume".to_string(),
"settlement-session-1",
Some(hash_drain_decision_prompt(prompt)),
);
let action = replay_drain_decision(&receipt, Some(prompt)).expect("matches");
assert_eq!(action, DrainAction::Resume);
let drift = replay_drain_decision(&receipt, Some("a different prompt"));
assert!(matches!(
drift,
Err(LifecycleReceiptError::DrainDecisionPromptHashMismatch { .. })
));
}
#[test]
fn record_then_snapshot_byte_identical_across_runs() {
fresh();
let s = SuspensionReceipt::new(
"worker://x",
None,
SuspendInitiator::Operator,
"op-1",
"reason",
None,
None,
);
let entry_a = record_suspension_receipt(&s);
fresh();
let entry_b = record_suspension_receipt(&s);
assert_eq!(entry_a.seq, entry_b.seq);
assert_eq!(entry_a.kind, entry_b.kind);
}
#[test]
fn snapshot_returns_recorded_entries_in_seq_order() {
fresh();
let s = SuspensionReceipt::new(
"worker://x",
None,
SuspendInitiator::Operator,
"op-1",
"reason",
None,
None,
);
record_suspension_receipt(&s);
let r = ResumptionReceipt::new(
"worker://x",
None,
ResumeInitiator::Operator,
"op-1",
None,
None,
true,
None,
None,
);
record_resumption_receipt(&r);
let snapshot = lifecycle_receipts_snapshot();
assert_eq!(snapshot.len(), 2);
assert_eq!(snapshot[0].seq, 1);
assert_eq!(snapshot[0].kind, SUSPENSION_RECEIPT_KIND);
assert_eq!(snapshot[1].seq, 2);
assert_eq!(snapshot[1].kind, RESUMPTION_RECEIPT_KIND);
}
}