#![allow(dead_code)]
use crate::{
ops::{
replay::model::{
ExternalEffectDescriptor, OperationId, REPLAY_PAYLOAD_HASH_SCHEMA_VERSION,
REPLAY_RECEIPT_SCHEMA_VERSION, ReplayActor, ReplayReceipt, ReplayReceiptStatus,
ReplayTerminalErrorCode, bounded_terminal_error_bytes,
},
storage::replay::ReplayReceiptOps,
},
storage::stable::replay::{ReplayReceiptRecord, ReplayReceiptSlotKey},
};
use super::model::{CommandKind, RecoveryReason};
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReplayReceiptReserveInput {
pub command_kind: CommandKind,
pub operation_id: OperationId,
pub actor: ReplayActor,
pub payload_hash_schema_version: u32,
pub payload_hash: [u8; 32],
pub now_ns: u64,
}
impl ReplayReceiptReserveInput {
#[must_use]
pub const fn new(
command_kind: CommandKind,
operation_id: OperationId,
actor: ReplayActor,
payload_hash: [u8; 32],
now_ns: u64,
) -> Self {
Self {
command_kind,
operation_id,
actor,
payload_hash_schema_version: REPLAY_PAYLOAD_HASH_SCHEMA_VERSION,
payload_hash,
now_ns,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ReplayReceiptToken {
key: ReplayReceiptSlotKey,
receipt: ReplayReceipt,
}
impl ReplayReceiptToken {
#[must_use]
pub const fn key(&self) -> ReplayReceiptSlotKey {
self.key
}
#[must_use]
pub const fn receipt(&self) -> &ReplayReceipt {
&self.receipt
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ReplayReceiptDecision {
Fresh(ReplayReceiptToken),
ReturnCommitted(ReplayReceipt),
OperationInProgress,
ActorMismatch,
PayloadMismatch,
RecoveryRequired(RecoveryReason),
TerminalFailed {
error_code: ReplayTerminalErrorCode,
error_bytes: Vec<u8>,
error_bytes_truncated: bool,
},
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ReplayReceiptStoreError {
ReceiptDecodeFailed(String),
}
pub fn reserve_or_replay_receipt(
input: ReplayReceiptReserveInput,
) -> Result<ReplayReceiptDecision, ReplayReceiptStoreError> {
let key = ReplayReceiptOps::slot_key(&input.command_kind, input.operation_id);
let Some(existing) = ReplayReceiptOps::get(key) else {
let receipt = ReplayReceipt {
schema_version: REPLAY_RECEIPT_SCHEMA_VERSION,
command_kind: input.command_kind,
operation_id: input.operation_id,
actor: input.actor,
payload_hash_schema_version: input.payload_hash_schema_version,
payload_hash: input.payload_hash,
status: ReplayReceiptStatus::Reserved,
created_at_ns: input.now_ns,
updated_at_ns: input.now_ns,
response_schema_version: None,
response_bytes: None,
effect: None,
};
ReplayReceiptOps::upsert(key, ReplayReceiptRecord::from_receipt(receipt.clone()));
return Ok(ReplayReceiptDecision::Fresh(ReplayReceiptToken {
key,
receipt,
}));
};
let existing = existing
.into_receipt()
.map_err(ReplayReceiptStoreError::ReceiptDecodeFailed)?;
Ok(classify_existing_receipt(&input, existing))
}
pub fn mark_external_effect_in_flight(
token: &ReplayReceiptToken,
effect: ExternalEffectDescriptor,
now_ns: u64,
) {
let mut receipt = token.receipt.clone();
receipt.status = ReplayReceiptStatus::ExternalEffectInFlight;
receipt.effect = Some(effect);
receipt.updated_at_ns = now_ns;
ReplayReceiptOps::upsert(token.key, ReplayReceiptRecord::from_receipt(receipt));
}
pub fn commit_receipt_response(
token: &ReplayReceiptToken,
response_schema_version: u32,
response_bytes: Vec<u8>,
now_ns: u64,
) {
let mut receipt = token.receipt.clone();
receipt.status = ReplayReceiptStatus::Committed;
receipt.response_schema_version = Some(response_schema_version);
receipt.response_bytes = Some(response_bytes);
receipt.updated_at_ns = now_ns;
ReplayReceiptOps::upsert(token.key, ReplayReceiptRecord::from_receipt(receipt));
}
pub fn commit_terminal_failure(
token: &ReplayReceiptToken,
error_code: ReplayTerminalErrorCode,
error_bytes: &[u8],
now_ns: u64,
) {
let bounded = bounded_terminal_error_bytes(error_bytes);
let mut receipt = token.receipt.clone();
receipt.status = ReplayReceiptStatus::TerminalFailed {
error_code,
error_bytes: bounded.bytes,
error_bytes_truncated: bounded.truncated,
};
receipt.updated_at_ns = now_ns;
ReplayReceiptOps::upsert(token.key, ReplayReceiptRecord::from_receipt(receipt));
}
pub fn mark_recovery_required(token: &ReplayReceiptToken, reason: RecoveryReason, now_ns: u64) {
let mut receipt = token.receipt.clone();
receipt.status = ReplayReceiptStatus::RecoveryRequired { reason };
receipt.updated_at_ns = now_ns;
ReplayReceiptOps::upsert(token.key, ReplayReceiptRecord::from_receipt(receipt));
}
fn classify_existing_receipt(
input: &ReplayReceiptReserveInput,
existing: ReplayReceipt,
) -> ReplayReceiptDecision {
if existing.actor != input.actor {
return ReplayReceiptDecision::ActorMismatch;
}
if existing.payload_hash_schema_version != input.payload_hash_schema_version
|| existing.payload_hash != input.payload_hash
{
return ReplayReceiptDecision::PayloadMismatch;
}
match existing.status {
ReplayReceiptStatus::Reserved | ReplayReceiptStatus::ExternalEffectInFlight => {
ReplayReceiptDecision::OperationInProgress
}
ReplayReceiptStatus::Committed => ReplayReceiptDecision::ReturnCommitted(existing),
ReplayReceiptStatus::TerminalFailed {
error_code,
error_bytes,
error_bytes_truncated,
} => ReplayReceiptDecision::TerminalFailed {
error_code,
error_bytes,
error_bytes_truncated,
},
ReplayReceiptStatus::RecoveryRequired { reason } => {
ReplayReceiptDecision::RecoveryRequired(reason)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{cdk::types::Principal, ops::storage::replay::ReplayReceiptOps};
fn p(id: u8) -> Principal {
Principal::from_slice(&[id; 29])
}
fn input() -> ReplayReceiptReserveInput {
ReplayReceiptReserveInput::new(
CommandKind::new("test.command.v1").expect("command"),
OperationId::from_bytes([7; 32]),
ReplayActor::direct_caller(p(1)),
[9; 32],
100,
)
}
#[test]
fn reserve_or_replay_receipt_reserves_new_receipt() {
ReplayReceiptOps::reset_for_tests();
let decision = reserve_or_replay_receipt(input()).expect("decision");
let ReplayReceiptDecision::Fresh(token) = decision else {
panic!("expected fresh reservation");
};
assert_eq!(ReplayReceiptOps::len(), 1);
assert_eq!(token.receipt.status, ReplayReceiptStatus::Reserved);
}
#[test]
fn reserve_or_replay_receipt_returns_committed_response_for_duplicate() {
ReplayReceiptOps::reset_for_tests();
let token = match reserve_or_replay_receipt(input()).expect("reserve") {
ReplayReceiptDecision::Fresh(token) => token,
other => panic!("expected fresh, got {other:?}"),
};
commit_receipt_response(&token, 1, vec![1, 2, 3], 200);
let duplicate = reserve_or_replay_receipt(input()).expect("duplicate");
let ReplayReceiptDecision::ReturnCommitted(receipt) = duplicate else {
panic!("expected committed replay");
};
assert_eq!(receipt.response_schema_version, Some(1));
assert_eq!(receipt.response_bytes.as_deref(), Some(&[1, 2, 3][..]));
}
#[test]
fn reserve_or_replay_receipt_rejects_actor_or_payload_mismatch() {
ReplayReceiptOps::reset_for_tests();
let _ = reserve_or_replay_receipt(input()).expect("reserve");
let mut actor_mismatch = input();
actor_mismatch.actor = ReplayActor::direct_caller(p(2));
assert_eq!(
reserve_or_replay_receipt(actor_mismatch).expect("actor mismatch"),
ReplayReceiptDecision::ActorMismatch
);
let mut payload_mismatch = input();
payload_mismatch.payload_hash = [8; 32];
assert_eq!(
reserve_or_replay_receipt(payload_mismatch).expect("payload mismatch"),
ReplayReceiptDecision::PayloadMismatch
);
}
#[test]
fn reserve_or_replay_receipt_reports_in_progress_for_reserved_or_effect() {
ReplayReceiptOps::reset_for_tests();
let token = match reserve_or_replay_receipt(input()).expect("reserve") {
ReplayReceiptDecision::Fresh(token) => token,
other => panic!("expected fresh, got {other:?}"),
};
assert_eq!(
reserve_or_replay_receipt(input()).expect("reserved duplicate"),
ReplayReceiptDecision::OperationInProgress
);
mark_external_effect_in_flight(
&token,
ExternalEffectDescriptor::ManagementCreateCanister {
command_kind: CommandKind::new("test.command.v1").expect("command"),
},
150,
);
assert_eq!(
reserve_or_replay_receipt(input()).expect("in-flight duplicate"),
ReplayReceiptDecision::OperationInProgress
);
}
#[test]
fn terminal_failure_is_bounded_before_storage() {
ReplayReceiptOps::reset_for_tests();
let token = match reserve_or_replay_receipt(input()).expect("reserve") {
ReplayReceiptDecision::Fresh(token) => token,
other => panic!("expected fresh, got {other:?}"),
};
commit_terminal_failure(
&token,
ReplayTerminalErrorCode::ExecutionFailed,
&vec![7; super::super::model::MAX_REPLAY_TERMINAL_ERROR_BYTES + 1],
300,
);
let duplicate = reserve_or_replay_receipt(input()).expect("duplicate");
let ReplayReceiptDecision::TerminalFailed {
error_code,
error_bytes,
error_bytes_truncated,
} = duplicate
else {
panic!("expected terminal failure replay");
};
assert_eq!(error_code, ReplayTerminalErrorCode::ExecutionFailed);
assert_eq!(
error_bytes.len(),
super::super::model::MAX_REPLAY_TERMINAL_ERROR_BYTES
);
assert!(error_bytes_truncated);
}
}