use crate::{
InternalError,
cdk::{candid::Nat, icrc_ledger_types::icrc1::transfer::TransferError, types::Principal},
dto::icp_refill::{IcpRefillErrorCode, IcpRefillRequest, IcpRefillResponse, IcpRefillStatus},
ids::CanisterRole,
infra::ic::icp_refill::{NotifyTopUpArg, NotifyTopUpError},
ops::{
cost_guard::CostGuardPermit,
ic::{IcOps, icp_refill::IcpRefillOps},
replay::receipt::{ReplayReceiptToken, abort_reserved_receipt},
runtime::cycles_funding::CyclesFundingLedgerOps,
storage::{
children::CanisterChildrenOps,
icp_refill::{IcpRefillOperationCreateInput, IcpRefillStoreOps},
},
},
view::icp_refill::IcpRefillOperation,
workflow::ic::icp_refill::{
MAX_NOTIFY_ATTEMPTS, RateQueryMode, TX_WINDOW_NANOS,
cost_guard::{
recover_icp_refill_cost_guard, require_icp_refill_cost_permit,
reserve_icp_refill_cost_guard_if_needed,
},
prepare_context,
replay::{
finish_icp_refill_replay, mark_icp_refill_notify_effect,
mark_icp_refill_recovery_required, mark_icp_refill_transfer_effect,
},
},
};
pub(super) async fn execute_fresh_manual_refill(
request: IcpRefillRequest,
operation_id: [u8; 32],
token: &ReplayReceiptToken,
) -> Result<IcpRefillResponse, InternalError> {
let mut cost_permit = None;
let operation =
match execute_manual_refill_operation(request, operation_id, token, &mut cost_permit).await
{
Ok(operation) => operation,
Err(err) => {
recover_icp_refill_cost_guard(cost_permit.as_ref());
abort_reserved_receipt(token);
return Err(err);
}
};
let response = IcpRefillStoreOps::to_response(&operation);
if let Err(err) = finish_icp_refill_replay(token, &operation, &response, cost_permit.as_ref()) {
abort_reserved_receipt(token);
return Err(err);
}
Ok(response)
}
async fn execute_manual_refill_operation(
request: IcpRefillRequest,
operation_id: [u8; 32],
token: &ReplayReceiptToken,
cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
if let Some(operation) = IcpRefillStoreOps::find_by_operation_id(operation_id) {
IcpRefillStoreOps::validate_retry_request_matches_operation(&request, &operation)?;
return advance_operation(operation, token, cost_permit).await;
}
let context = prepare_context(&request, RateQueryMode::WhenGateConfigured).await?;
let cmc_account =
IcpRefillOps::cmc_topup_account(context.cmc_canister_id, request.target_canister)?;
let operation = IcpRefillStoreOps::create_or_get(IcpRefillOperationCreateInput {
operation_id,
source_canister: request.source_canister,
source_subaccount: request.source_subaccount,
target_canister: request.target_canister,
ledger_canister_id: context.ledger_canister_id,
cmc_canister_id: context.cmc_canister_id,
cmc_to_account_owner: cmc_account.owner,
cmc_to_account_subaccount: cmc_account.subaccount,
amount_e8s: request.amount_e8s,
fee_e8s: context.fee_e8s,
memo: IcpRefillOps::topup_memo(),
created_at_time_ns: context.created_at_time_ns,
now_ns: IcOps::now_nanos(),
})?;
advance_operation(operation, token, cost_permit).await
}
async fn transfer_operation(
operation: IcpRefillOperation,
token: &ReplayReceiptToken,
cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
let to = IcpRefillOps::cmc_topup_account(operation.cmc_canister_id, operation.target_canister)?;
let transfer_arg = IcpRefillOps::transfer_arg(
operation.source_subaccount,
to,
operation.amount_e8s,
operation.fee_e8s,
operation.memo.clone(),
operation.created_at_time_ns,
);
reserve_icp_refill_cost_guard_if_needed(token, &operation, cost_permit)?;
let cost_permit = require_icp_refill_cost_permit(cost_permit.as_ref())?;
mark_icp_refill_transfer_effect(token, &operation);
match IcpRefillOps::icrc1_transfer(cost_permit, operation.ledger_canister_id, transfer_arg)
.await
{
Err(err) => {
mark_icp_refill_recovery_required(token, &operation, "ledger_transfer", &err);
Err(err)
}
Ok(Ok(block_index)) => {
let block_index = match IcpRefillOps::checked_block_index(block_index) {
Ok(block_index) => block_index,
Err(err) => {
return IcpRefillStoreOps::mark_transfer_failed(
operation.id,
IcpRefillErrorCode::InvalidLedgerBlockIndex,
err.to_string(),
IcOps::now_nanos(),
);
}
};
IcpRefillStoreOps::mark_transferred(operation.id, block_index, IcOps::now_nanos())
}
Ok(Err(err)) => apply_transfer_error(operation.id, err),
}
}
async fn advance_operation(
operation: IcpRefillOperation,
token: &ReplayReceiptToken,
cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
let operation = match operation.status {
IcpRefillStatus::Requested => {
transfer_unless_window_stale(operation, token, cost_permit).await?
}
IcpRefillStatus::Transferred | IcpRefillStatus::NotifyProcessing => operation,
IcpRefillStatus::Failed if IcpRefillStoreOps::can_retry_notify(&operation) => operation,
IcpRefillStatus::Failed if IcpRefillStoreOps::can_retry_bad_fee(&operation) => {
transfer_unless_window_stale(operation, token, cost_permit).await?
}
IcpRefillStatus::Completed
| IcpRefillStatus::Failed
| IcpRefillStatus::InvalidTransaction
| IcpRefillStatus::Refunded
| IcpRefillStatus::TransactionTooOld => return Ok(operation),
};
if IcpRefillStoreOps::should_notify(&operation) {
notify_operation(operation, token, cost_permit).await
} else {
Ok(operation)
}
}
async fn transfer_unless_window_stale(
operation: IcpRefillOperation,
token: &ReplayReceiptToken,
cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
let now_ns = IcOps::now_nanos();
if IcpRefillStoreOps::transfer_window_stale(&operation, now_ns, TX_WINDOW_NANOS) {
IcpRefillStoreOps::mark_transfer_window_stale(operation.id, now_ns)
} else {
transfer_operation(operation, token, cost_permit).await
}
}
async fn notify_operation(
operation: IcpRefillOperation,
token: &ReplayReceiptToken,
cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
let Some(block_index) = operation.ledger_block_index else {
return IcpRefillStoreOps::mark_notify_failed(
operation.id,
"notify_top_up cannot run before ledger block is recorded".to_string(),
IcOps::now_nanos(),
);
};
let operation =
IcpRefillStoreOps::mark_notify_attempt_started(operation.id, IcOps::now_nanos())?;
let args = NotifyTopUpArg {
block_index,
canister_id: operation.target_canister,
};
reserve_icp_refill_cost_guard_if_needed(token, &operation, cost_permit)?;
let cost_permit = require_icp_refill_cost_permit(cost_permit.as_ref())?;
mark_icp_refill_notify_effect(token, &operation);
match IcpRefillOps::notify_top_up(cost_permit, operation.cmc_canister_id, args).await {
Ok(Ok(cycles_sent)) => {
let operation =
IcpRefillStoreOps::mark_completed(operation.id, cycles_sent, IcOps::now_nanos())?;
record_direct_child_refill_grant(&operation, IcOps::now_secs());
Ok(operation)
}
Ok(Err(err)) => apply_notify_error(operation.id, operation.notify_attempts, err),
Err(err) => {
mark_icp_refill_recovery_required(token, &operation, "cmc_notify_top_up", &err);
Err(err)
}
}
}
pub(super) fn apply_transfer_error(
record_id: u64,
err: TransferError,
) -> Result<IcpRefillOperation, InternalError> {
match err {
TransferError::BadFee { expected_fee } => {
let expected_fee_e8s = match crate::workflow::ic::icp_refill::checked_nat_u64(
"bad_fee.expected_fee",
expected_fee,
) {
Ok(expected_fee_e8s) => expected_fee_e8s,
Err(err) => {
return IcpRefillStoreOps::mark_transfer_failed(
record_id,
IcpRefillErrorCode::BadFee,
err.to_string(),
IcOps::now_nanos(),
);
}
};
IcpRefillStoreOps::mark_bad_fee(
record_id,
expected_fee_e8s,
format!("bad fee; expected {expected_fee_e8s}"),
IcOps::now_nanos(),
)
}
TransferError::Duplicate { duplicate_of } => {
let duplicate_of = match IcpRefillOps::checked_block_index(duplicate_of) {
Ok(block_index) => block_index,
Err(err) => {
return IcpRefillStoreOps::mark_transfer_failed(
record_id,
IcpRefillErrorCode::InvalidLedgerBlockIndex,
err.to_string(),
IcOps::now_nanos(),
);
}
};
IcpRefillStoreOps::mark_duplicate_transferred(
record_id,
duplicate_of,
IcOps::now_nanos(),
)
}
TransferError::TooOld => {
IcpRefillStoreOps::mark_transfer_window_stale(record_id, IcOps::now_nanos())
}
other => IcpRefillStoreOps::mark_transfer_failed(
record_id,
IcpRefillErrorCode::LedgerTransferFailed,
other.to_string(),
IcOps::now_nanos(),
),
}
}
pub(super) fn apply_notify_error(
record_id: u64,
notify_attempts: u32,
err: NotifyTopUpError,
) -> Result<IcpRefillOperation, InternalError> {
match err {
NotifyTopUpError::Refunded {
block_index,
reason,
} => IcpRefillStoreOps::mark_refunded(record_id, block_index, reason, IcOps::now_nanos()),
NotifyTopUpError::InvalidTransaction(reason) => {
IcpRefillStoreOps::mark_invalid_transaction(record_id, reason, IcOps::now_nanos())
}
NotifyTopUpError::Processing => mark_notify_processing(record_id, notify_attempts),
NotifyTopUpError::TransactionTooOld(min_block_index) => {
IcpRefillStoreOps::mark_transaction_too_old(
record_id,
Some(min_block_index),
IcOps::now_nanos(),
)
}
NotifyTopUpError::Other {
error_code,
error_message,
} => mark_retryable_notify_failure(
record_id,
notify_attempts,
format!("notify_top_up error {error_code}: {error_message}"),
),
}
}
pub(super) fn mark_notify_processing(
record_id: u64,
notify_attempts: u32,
) -> Result<IcpRefillOperation, InternalError> {
if notify_attempts >= MAX_NOTIFY_ATTEMPTS {
IcpRefillStoreOps::mark_notify_max_attempts(
record_id,
"notify_top_up returned Processing after max attempts".to_string(),
IcOps::now_nanos(),
)
} else {
IcpRefillStoreOps::mark_notify_processing(record_id, IcOps::now_nanos())
}
}
pub(super) fn mark_retryable_notify_failure(
record_id: u64,
notify_attempts: u32,
error_message: String,
) -> Result<IcpRefillOperation, InternalError> {
if notify_attempts >= MAX_NOTIFY_ATTEMPTS {
IcpRefillStoreOps::mark_notify_max_attempts(record_id, error_message, IcOps::now_nanos())
} else {
IcpRefillStoreOps::mark_notify_failed(record_id, error_message, IcOps::now_nanos())
}
}
fn record_direct_child_refill_grant(operation: &IcpRefillOperation, now_secs: u64) {
let Some(cycles_sent) = operation.cycles_sent.as_ref() else {
return;
};
let Some((_child_role, parent_pid)) =
CanisterChildrenOps::role_parent(operation.target_canister)
else {
return;
};
let Some((child, cycles)) = direct_child_refill_grant(operation, cycles_sent, parent_pid)
else {
return;
};
CyclesFundingLedgerOps::record_child_grant(child, cycles, now_secs);
}
pub(super) fn direct_child_refill_grant(
operation: &IcpRefillOperation,
cycles_sent: &Nat,
parent_pid: Option<Principal>,
) -> Option<(Principal, u128)> {
if !direct_child_refill_parent_matches(parent_pid, operation.source_canister) {
return None;
}
Some((
operation.target_canister,
IcpRefillStoreOps::nat_to_u128_saturating(cycles_sent),
))
}
pub(super) fn direct_child_refill_role(
target_canister: Principal,
source_canister: Principal,
) -> Option<CanisterRole> {
let (role, parent_pid) = CanisterChildrenOps::role_parent(target_canister)?;
if direct_child_refill_parent_matches(parent_pid, source_canister) {
Some(role)
} else {
None
}
}
fn direct_child_refill_parent_matches(
parent_pid: Option<Principal>,
source_canister: Principal,
) -> bool {
parent_pid == Some(source_canister)
}