use crate::{
InternalError, InternalErrorOrigin,
dto::{
error::Error,
icp_refill::{IcpRefillMode, IcpRefillRequest, IcpRefillResponse},
},
ops::{
ic::IcOps,
replay::{
self as replay_ops, ICP_REFILL_REPLAY_RESPONSE_SCHEMA_VERSION,
model::{
CommandKind, ExternalEffectDescriptor, OperationId, RecoveryReason, ReplayActor,
ReplayPayloadHasher, ReplayReceipt,
},
receipt::{
ReplayReceiptDecision, ReplayReceiptReserveInput, ReplayReceiptStoreError,
ReplayReceiptToken, abort_uncommitted_receipt, commit_receipt_response,
mark_external_effect_in_flight, mark_recovery_required, reserve_or_replay_receipt,
},
},
storage::icp_refill::IcpRefillStoreOps,
},
view::icp_refill::IcpRefillOperation,
workflow::{
ic::icp_refill::{
ICP_REFILL_REPLAY_COMMAND_KIND,
cost_guard::{complete_icp_refill_cost_guard, recover_icp_refill_cost_guard},
},
prelude::*,
},
};
#[derive(Debug)]
pub(super) enum IcpRefillReplayReservation {
Fresh {
operation_id: [u8; 32],
token: Box<ReplayReceiptToken>,
},
Replay(IcpRefillResponse),
}
pub(super) fn icp_refill_replay_reserve_input(
request: &IcpRefillRequest,
caller: Principal,
now_ns: u64,
) -> ReplayReceiptReserveInput {
let command_kind = icp_refill_command_kind();
let actor = icp_refill_replay_actor(caller);
let payload_hash = icp_refill_payload_hash(&command_kind, &actor, request);
ReplayReceiptReserveInput::new(
command_kind,
icp_refill_operation_id(request),
actor,
payload_hash,
now_ns,
)
}
pub(super) fn reserve_icp_refill_replay(
input: ReplayReceiptReserveInput,
) -> Result<IcpRefillReplayReservation, InternalError> {
let operation_id = input.operation_id.into_bytes();
match reserve_or_replay_receipt(input).map_err(map_icp_refill_replay_store_error)? {
ReplayReceiptDecision::Fresh(token) => Ok(IcpRefillReplayReservation::Fresh {
operation_id,
token: Box::new(token),
}),
ReplayReceiptDecision::ReturnCommitted(receipt) => {
decode_icp_refill_replay_response(&receipt).map(IcpRefillReplayReservation::Replay)
}
ReplayReceiptDecision::OperationInProgress => {
log_icp_refill_replay_conflict(operation_id, "operation_in_progress");
Err(InternalError::public(Error::conflict(
"ICP refill request is already in progress; retry later with the same operation id",
)))
}
ReplayReceiptDecision::ActorMismatch => {
log_icp_refill_replay_conflict(operation_id, "actor_mismatch");
Err(InternalError::public(Error::conflict(
"ICP refill operation id was reused by a different caller",
)))
}
ReplayReceiptDecision::PayloadMismatch => {
log_icp_refill_replay_conflict(operation_id, "payload_mismatch");
Err(InternalError::public(Error::conflict(
"ICP refill operation id was reused with a different payload",
)))
}
ReplayReceiptDecision::Expired => {
log_icp_refill_replay_conflict(operation_id, "expired");
Err(InternalError::public(Error::conflict(
"ICP refill replay receipt expired; retry with a new operation id",
)))
}
ReplayReceiptDecision::RecoveryRequired(reason) => {
log_icp_refill_replay_conflict(operation_id, "recovery_required");
Err(InternalError::public(Error::conflict(format!(
"ICP refill request requires recovery before replay: {reason:?}"
))))
}
ReplayReceiptDecision::TerminalFailed {
error_code,
error_bytes,
error_bytes_truncated,
} => {
log_icp_refill_replay_conflict(operation_id, "terminal_failed");
Err(InternalError::public(Error::conflict(format!(
"ICP refill request previously failed: {error_code:?}; error_bytes_len={}; truncated={error_bytes_truncated}",
error_bytes.len()
))))
}
ReplayReceiptDecision::PendingActorQuotaExceeded { max_pending, .. } => {
log_icp_refill_replay_conflict(operation_id, "pending_actor_quota_exceeded");
Err(InternalError::public(Error::exhausted(format!(
"ICP refill pending replay receipt quota exceeded for caller; max_pending={max_pending}"
))))
}
ReplayReceiptDecision::PendingCommandQuotaExceeded { max_pending, .. } => {
log_icp_refill_replay_conflict(operation_id, "pending_command_quota_exceeded");
Err(InternalError::public(Error::exhausted(format!(
"ICP refill pending replay receipt quota exceeded for command kind; max_pending={max_pending}"
))))
}
}
}
pub(super) fn finish_icp_refill_replay(
token: &ReplayReceiptToken,
operation: &IcpRefillOperation,
response: &IcpRefillResponse,
cost_permit: Option<&crate::ops::cost_guard::CostGuardPermit>,
) -> Result<(), InternalError> {
if IcpRefillStoreOps::is_resumable(operation) {
recover_icp_refill_cost_guard(cost_permit);
log_icp_refill_resumable_abort(operation);
abort_uncommitted_receipt(token);
return Ok(());
}
let response_bytes = match encode_icp_refill_replay_response(response) {
Ok(response_bytes) => response_bytes,
Err(err) => {
recover_icp_refill_cost_guard(cost_permit);
mark_recovery_required(
token,
RecoveryReason::ResponseCommitFailed,
IcOps::now_nanos(),
);
return Err(err);
}
};
commit_receipt_response(
token,
ICP_REFILL_REPLAY_RESPONSE_SCHEMA_VERSION,
response_bytes,
IcOps::now_nanos(),
);
complete_icp_refill_cost_guard(cost_permit);
log_icp_refill_commit(operation);
Ok(())
}
pub(super) fn mark_icp_refill_transfer_effect(
token: &ReplayReceiptToken,
operation: &IcpRefillOperation,
) {
mark_external_effect_in_flight(
token,
ExternalEffectDescriptor::IcpTransfer {
operation_id: OperationId::from_bytes(operation.operation_id),
},
IcOps::now_nanos(),
);
crate::log!(
crate::log::Topic::Cycles,
Info,
"icp refill replay effect marked effect=ledger_transfer command_kind={} operation_id={} record_id={} source={} target={} amount_e8s={}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(operation.operation_id),
operation.id,
operation.source_canister,
operation.target_canister,
operation.amount_e8s
);
}
pub(super) fn mark_icp_refill_notify_effect(
token: &ReplayReceiptToken,
operation: &IcpRefillOperation,
) {
mark_external_effect_in_flight(
token,
ExternalEffectDescriptor::ManagementCall {
canister: operation.cmc_canister_id,
method: "notify_top_up".to_string(),
},
IcOps::now_nanos(),
);
crate::log!(
crate::log::Topic::Cycles,
Info,
"icp refill replay effect marked effect=cmc_notify_top_up command_kind={} operation_id={} record_id={} source={} target={} amount_e8s={}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(operation.operation_id),
operation.id,
operation.source_canister,
operation.target_canister,
operation.amount_e8s
);
}
pub(super) fn mark_icp_refill_recovery_required(
token: &ReplayReceiptToken,
operation: &IcpRefillOperation,
effect: &'static str,
err: &InternalError,
) {
let (error_class, error_origin) = err.log_fields();
mark_recovery_required(
token,
RecoveryReason::ExternalEffectStatusUnknown,
IcOps::now_nanos(),
);
crate::log!(
crate::log::Topic::Cycles,
Error,
"icp refill replay recovery required effect={} command_kind={} operation_id={} record_id={} source={} target={} amount_e8s={} error_class={} error_origin={}",
effect,
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(operation.operation_id),
operation.id,
operation.source_canister,
operation.target_canister,
operation.amount_e8s,
error_class,
error_origin
);
}
pub(super) fn log_icp_refill_fresh_reservation(request: &IcpRefillRequest) {
crate::log!(
crate::log::Topic::Cycles,
Info,
"icp refill replay receipt reserved command_kind={} operation_id={} source={} target={} amount_e8s={}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(request.operation_id),
request.source_canister,
request.target_canister,
request.amount_e8s
);
}
pub(super) fn log_icp_refill_committed_replay(response: &IcpRefillResponse) {
crate::log!(
crate::log::Topic::Cycles,
Info,
"icp refill committed replay returned command_kind={} operation_id={} status={:?}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(response.operation_id),
response.status
);
}
pub(super) fn log_icp_refill_replay_conflict(operation_id: [u8; 32], decision: &'static str) {
crate::log!(
crate::log::Topic::Cycles,
Warn,
"icp refill replay decision blocked command_kind={} operation_id={} decision={}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(operation_id),
decision
);
}
pub(super) fn log_icp_refill_resumable_abort(operation: &IcpRefillOperation) {
crate::log!(
crate::log::Topic::Cycles,
Info,
"icp refill replay receipt aborted for resumable record command_kind={} operation_id={} record_id={} source={} target={} amount_e8s={} status={:?}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(operation.operation_id),
operation.id,
operation.source_canister,
operation.target_canister,
operation.amount_e8s,
operation.status
);
}
pub(super) fn operation_id_display(operation_id: [u8; 32]) -> String {
OperationId::from_bytes(operation_id).to_string()
}
pub(super) fn log_icp_refill_commit(operation: &IcpRefillOperation) {
crate::log!(
crate::log::Topic::Cycles,
Ok,
"icp refill replay response committed command_kind={} operation_id={} record_id={} source={} target={} amount_e8s={} status={:?}",
ICP_REFILL_REPLAY_COMMAND_KIND,
operation_id_display(operation.operation_id),
operation.id,
operation.source_canister,
operation.target_canister,
operation.amount_e8s,
operation.status
);
}
pub(super) const fn icp_refill_operation_id(request: &IcpRefillRequest) -> OperationId {
OperationId::from_bytes(request.operation_id)
}
pub(super) fn icp_refill_command_kind() -> CommandKind {
CommandKind::new(ICP_REFILL_REPLAY_COMMAND_KIND)
.expect("ICP refill replay command kind is a valid static label")
}
pub(super) const fn icp_refill_replay_actor(caller: Principal) -> ReplayActor {
ReplayActor::direct_caller(caller)
}
pub(super) fn icp_refill_payload_hash(
command_kind: &CommandKind,
actor: &ReplayActor,
request: &IcpRefillRequest,
) -> [u8; 32] {
let mut hasher = ReplayPayloadHasher::new(command_kind, actor);
hasher.hash_str("IcpRefill");
hasher.hash_principal(&request.source_canister);
hash_optional_subaccount(&mut hasher, request.source_subaccount);
hasher.hash_principal(&request.target_canister);
hasher.hash_u64(request.amount_e8s);
hasher.hash_str(icp_refill_mode_label(request.mode));
hasher.finish()
}
fn encode_icp_refill_replay_response(
response: &IcpRefillResponse,
) -> Result<Vec<u8>, InternalError> {
replay_ops::encode_icp_refill_replay_response(response).map_err(|err| match err {
replay_ops::ReplayCommitError::EncodeFailed(message) => InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("failed to encode ICP refill replay response: {message}"),
),
})
}
fn decode_icp_refill_replay_response(
receipt: &ReplayReceipt,
) -> Result<IcpRefillResponse, InternalError> {
replay_ops::decode_icp_refill_replay_response(receipt).map_err(|err| match err {
replay_ops::ReplayDecodeError::DecodeFailed(message) => {
InternalError::workflow(InternalErrorOrigin::Workflow, message)
}
})
}
fn map_icp_refill_replay_store_error(err: ReplayReceiptStoreError) -> InternalError {
match err {
ReplayReceiptStoreError::ReceiptDecodeFailed(message) => InternalError::workflow(
InternalErrorOrigin::Workflow,
format!("failed to decode ICP refill replay receipt: {message}"),
),
}
}
fn hash_optional_subaccount(hasher: &mut ReplayPayloadHasher, subaccount: Option<[u8; 32]>) {
hasher.hash_bool(subaccount.is_some());
if let Some(subaccount) = subaccount {
hasher.hash_bytes(&subaccount);
}
}
const fn icp_refill_mode_label(mode: IcpRefillMode) -> &'static str {
match mode {
IcpRefillMode::Canister => "canister",
IcpRefillMode::Fabricate => "fabricate",
}
}