canic-core 0.70.11

Canic — a canister orchestration and management toolkit for the Internet Computer
Documentation
//! Module: workflow::ic::icp_refill::execution
//!
//! Responsibility: execute ICP ledger transfers and CMC top-up notifications.
//! Does not own: endpoint authorization, stable record schemas, or pure policy.
//! Boundary: orchestrates ops/storage/replay after request preflight.

use crate::{
    InternalError,
    cdk::{candid::Nat, icrc_ledger_types::icrc1::transfer::TransferError, types::Principal},
    dto::icp_refill::{IcpRefillErrorCode, IcpRefillRequest, IcpRefillResponse, IcpRefillStatus},
    ids::CanisterRole,
    infra::ic::icp_refill::{NotifyTopUpArg, NotifyTopUpError},
    ops::{
        cost_guard::CostGuardPermit,
        ic::{IcOps, icp_refill::IcpRefillOps},
        replay::receipt::{ReplayReceiptToken, abort_reserved_receipt},
        runtime::cycles_funding::CyclesFundingLedgerOps,
        storage::{
            children::CanisterChildrenOps,
            icp_refill::{IcpRefillOperationCreateInput, IcpRefillStoreOps},
        },
    },
    view::icp_refill::IcpRefillOperation,
    workflow::ic::icp_refill::{
        MAX_NOTIFY_ATTEMPTS, RateQueryMode, TX_WINDOW_NANOS,
        cost_guard::{
            recover_icp_refill_cost_guard, require_icp_refill_cost_permit,
            reserve_icp_refill_cost_guard_if_needed,
        },
        prepare_context,
        replay::{
            finish_icp_refill_replay, mark_icp_refill_notify_effect,
            mark_icp_refill_recovery_required, mark_icp_refill_transfer_effect,
        },
    },
};

pub(super) async fn execute_fresh_manual_refill(
    request: IcpRefillRequest,
    operation_id: [u8; 32],
    token: &ReplayReceiptToken,
) -> Result<IcpRefillResponse, InternalError> {
    let mut cost_permit = None;
    let operation =
        match execute_manual_refill_operation(request, operation_id, token, &mut cost_permit).await
        {
            Ok(operation) => operation,
            Err(err) => {
                recover_icp_refill_cost_guard(cost_permit.as_ref());
                abort_reserved_receipt(token);
                return Err(err);
            }
        };
    let response = IcpRefillStoreOps::to_response(&operation);

    if let Err(err) = finish_icp_refill_replay(token, &operation, &response, cost_permit.as_ref()) {
        abort_reserved_receipt(token);
        return Err(err);
    }

    Ok(response)
}

async fn execute_manual_refill_operation(
    request: IcpRefillRequest,
    operation_id: [u8; 32],
    token: &ReplayReceiptToken,
    cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
    if let Some(operation) = IcpRefillStoreOps::find_by_operation_id(operation_id) {
        IcpRefillStoreOps::validate_retry_request_matches_operation(&request, &operation)?;
        return advance_operation(operation, token, cost_permit).await;
    }

    let context = prepare_context(&request, RateQueryMode::WhenGateConfigured).await?;
    let cmc_account =
        IcpRefillOps::cmc_topup_account(context.cmc_canister_id, request.target_canister)?;
    let operation = IcpRefillStoreOps::create_or_get(IcpRefillOperationCreateInput {
        operation_id,
        source_canister: request.source_canister,
        source_subaccount: request.source_subaccount,
        target_canister: request.target_canister,
        ledger_canister_id: context.ledger_canister_id,
        cmc_canister_id: context.cmc_canister_id,
        cmc_to_account_owner: cmc_account.owner,
        cmc_to_account_subaccount: cmc_account.subaccount,
        amount_e8s: request.amount_e8s,
        fee_e8s: context.fee_e8s,
        memo: IcpRefillOps::topup_memo(),
        created_at_time_ns: context.created_at_time_ns,
        now_ns: IcOps::now_nanos(),
    })?;

    advance_operation(operation, token, cost_permit).await
}

async fn transfer_operation(
    operation: IcpRefillOperation,
    token: &ReplayReceiptToken,
    cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
    let to = IcpRefillOps::cmc_topup_account(operation.cmc_canister_id, operation.target_canister)?;
    let transfer_arg = IcpRefillOps::transfer_arg(
        operation.source_subaccount,
        to,
        operation.amount_e8s,
        operation.fee_e8s,
        operation.memo.clone(),
        operation.created_at_time_ns,
    );

    reserve_icp_refill_cost_guard_if_needed(token, &operation, cost_permit)?;
    let cost_permit = require_icp_refill_cost_permit(cost_permit.as_ref())?;
    mark_icp_refill_transfer_effect(token, &operation);

    match IcpRefillOps::icrc1_transfer(cost_permit, operation.ledger_canister_id, transfer_arg)
        .await
    {
        Err(err) => {
            mark_icp_refill_recovery_required(token, &operation, "ledger_transfer", &err);
            Err(err)
        }
        Ok(Ok(block_index)) => {
            let block_index = match IcpRefillOps::checked_block_index(block_index) {
                Ok(block_index) => block_index,
                Err(err) => {
                    return IcpRefillStoreOps::mark_transfer_failed(
                        operation.id,
                        IcpRefillErrorCode::InvalidLedgerBlockIndex,
                        err.to_string(),
                        IcOps::now_nanos(),
                    );
                }
            };
            IcpRefillStoreOps::mark_transferred(operation.id, block_index, IcOps::now_nanos())
        }
        Ok(Err(err)) => apply_transfer_error(operation.id, err),
    }
}

async fn advance_operation(
    operation: IcpRefillOperation,
    token: &ReplayReceiptToken,
    cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
    let operation = match operation.status {
        IcpRefillStatus::Requested => {
            transfer_unless_window_stale(operation, token, cost_permit).await?
        }
        IcpRefillStatus::Transferred | IcpRefillStatus::NotifyProcessing => operation,
        IcpRefillStatus::Failed if IcpRefillStoreOps::can_retry_notify(&operation) => operation,
        IcpRefillStatus::Failed if IcpRefillStoreOps::can_retry_bad_fee(&operation) => {
            transfer_unless_window_stale(operation, token, cost_permit).await?
        }
        IcpRefillStatus::Completed
        | IcpRefillStatus::Failed
        | IcpRefillStatus::InvalidTransaction
        | IcpRefillStatus::Refunded
        | IcpRefillStatus::TransactionTooOld => return Ok(operation),
    };

    if IcpRefillStoreOps::should_notify(&operation) {
        notify_operation(operation, token, cost_permit).await
    } else {
        Ok(operation)
    }
}

async fn transfer_unless_window_stale(
    operation: IcpRefillOperation,
    token: &ReplayReceiptToken,
    cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
    let now_ns = IcOps::now_nanos();
    if IcpRefillStoreOps::transfer_window_stale(&operation, now_ns, TX_WINDOW_NANOS) {
        IcpRefillStoreOps::mark_transfer_window_stale(operation.id, now_ns)
    } else {
        transfer_operation(operation, token, cost_permit).await
    }
}

async fn notify_operation(
    operation: IcpRefillOperation,
    token: &ReplayReceiptToken,
    cost_permit: &mut Option<CostGuardPermit>,
) -> Result<IcpRefillOperation, InternalError> {
    let Some(block_index) = operation.ledger_block_index else {
        return IcpRefillStoreOps::mark_notify_failed(
            operation.id,
            "notify_top_up cannot run before ledger block is recorded".to_string(),
            IcOps::now_nanos(),
        );
    };

    let operation =
        IcpRefillStoreOps::mark_notify_attempt_started(operation.id, IcOps::now_nanos())?;
    let args = NotifyTopUpArg {
        block_index,
        canister_id: operation.target_canister,
    };

    reserve_icp_refill_cost_guard_if_needed(token, &operation, cost_permit)?;
    let cost_permit = require_icp_refill_cost_permit(cost_permit.as_ref())?;
    mark_icp_refill_notify_effect(token, &operation);

    match IcpRefillOps::notify_top_up(cost_permit, operation.cmc_canister_id, args).await {
        Ok(Ok(cycles_sent)) => {
            let operation =
                IcpRefillStoreOps::mark_completed(operation.id, cycles_sent, IcOps::now_nanos())?;
            record_direct_child_refill_grant(&operation, IcOps::now_secs());
            Ok(operation)
        }
        Ok(Err(err)) => apply_notify_error(operation.id, operation.notify_attempts, err),
        Err(err) => {
            mark_icp_refill_recovery_required(token, &operation, "cmc_notify_top_up", &err);
            Err(err)
        }
    }
}

pub(super) fn apply_transfer_error(
    record_id: u64,
    err: TransferError,
) -> Result<IcpRefillOperation, InternalError> {
    match err {
        TransferError::BadFee { expected_fee } => {
            let expected_fee_e8s = match crate::workflow::ic::icp_refill::checked_nat_u64(
                "bad_fee.expected_fee",
                expected_fee,
            ) {
                Ok(expected_fee_e8s) => expected_fee_e8s,
                Err(err) => {
                    return IcpRefillStoreOps::mark_transfer_failed(
                        record_id,
                        IcpRefillErrorCode::BadFee,
                        err.to_string(),
                        IcOps::now_nanos(),
                    );
                }
            };
            IcpRefillStoreOps::mark_bad_fee(
                record_id,
                expected_fee_e8s,
                format!("bad fee; expected {expected_fee_e8s}"),
                IcOps::now_nanos(),
            )
        }
        TransferError::Duplicate { duplicate_of } => {
            let duplicate_of = match IcpRefillOps::checked_block_index(duplicate_of) {
                Ok(block_index) => block_index,
                Err(err) => {
                    return IcpRefillStoreOps::mark_transfer_failed(
                        record_id,
                        IcpRefillErrorCode::InvalidLedgerBlockIndex,
                        err.to_string(),
                        IcOps::now_nanos(),
                    );
                }
            };
            IcpRefillStoreOps::mark_duplicate_transferred(
                record_id,
                duplicate_of,
                IcOps::now_nanos(),
            )
        }
        TransferError::TooOld => {
            IcpRefillStoreOps::mark_transfer_window_stale(record_id, IcOps::now_nanos())
        }
        other => IcpRefillStoreOps::mark_transfer_failed(
            record_id,
            IcpRefillErrorCode::LedgerTransferFailed,
            other.to_string(),
            IcOps::now_nanos(),
        ),
    }
}

pub(super) fn apply_notify_error(
    record_id: u64,
    notify_attempts: u32,
    err: NotifyTopUpError,
) -> Result<IcpRefillOperation, InternalError> {
    match err {
        NotifyTopUpError::Refunded {
            block_index,
            reason,
        } => IcpRefillStoreOps::mark_refunded(record_id, block_index, reason, IcOps::now_nanos()),
        NotifyTopUpError::InvalidTransaction(reason) => {
            IcpRefillStoreOps::mark_invalid_transaction(record_id, reason, IcOps::now_nanos())
        }
        NotifyTopUpError::Processing => mark_notify_processing(record_id, notify_attempts),
        NotifyTopUpError::TransactionTooOld(min_block_index) => {
            IcpRefillStoreOps::mark_transaction_too_old(
                record_id,
                Some(min_block_index),
                IcOps::now_nanos(),
            )
        }
        NotifyTopUpError::Other {
            error_code,
            error_message,
        } => mark_retryable_notify_failure(
            record_id,
            notify_attempts,
            format!("notify_top_up error {error_code}: {error_message}"),
        ),
    }
}

pub(super) fn mark_notify_processing(
    record_id: u64,
    notify_attempts: u32,
) -> Result<IcpRefillOperation, InternalError> {
    if notify_attempts >= MAX_NOTIFY_ATTEMPTS {
        IcpRefillStoreOps::mark_notify_max_attempts(
            record_id,
            "notify_top_up returned Processing after max attempts".to_string(),
            IcOps::now_nanos(),
        )
    } else {
        IcpRefillStoreOps::mark_notify_processing(record_id, IcOps::now_nanos())
    }
}

pub(super) fn mark_retryable_notify_failure(
    record_id: u64,
    notify_attempts: u32,
    error_message: String,
) -> Result<IcpRefillOperation, InternalError> {
    if notify_attempts >= MAX_NOTIFY_ATTEMPTS {
        IcpRefillStoreOps::mark_notify_max_attempts(record_id, error_message, IcOps::now_nanos())
    } else {
        IcpRefillStoreOps::mark_notify_failed(record_id, error_message, IcOps::now_nanos())
    }
}

fn record_direct_child_refill_grant(operation: &IcpRefillOperation, now_secs: u64) {
    let Some(cycles_sent) = operation.cycles_sent.as_ref() else {
        return;
    };
    let Some((_child_role, parent_pid)) =
        CanisterChildrenOps::role_parent(operation.target_canister)
    else {
        return;
    };
    let Some((child, cycles)) = direct_child_refill_grant(operation, cycles_sent, parent_pid)
    else {
        return;
    };

    CyclesFundingLedgerOps::record_child_grant(child, cycles, now_secs);
}

pub(super) fn direct_child_refill_grant(
    operation: &IcpRefillOperation,
    cycles_sent: &Nat,
    parent_pid: Option<Principal>,
) -> Option<(Principal, u128)> {
    if !direct_child_refill_parent_matches(parent_pid, operation.source_canister) {
        return None;
    }

    Some((
        operation.target_canister,
        IcpRefillStoreOps::nat_to_u128_saturating(cycles_sent),
    ))
}

pub(super) fn direct_child_refill_role(
    target_canister: Principal,
    source_canister: Principal,
) -> Option<CanisterRole> {
    let (role, parent_pid) = CanisterChildrenOps::role_parent(target_canister)?;
    if direct_child_refill_parent_matches(parent_pid, source_canister) {
        Some(role)
    } else {
        None
    }
}

fn direct_child_refill_parent_matches(
    parent_pid: Option<Principal>,
    source_canister: Principal,
) -> bool {
    parent_pid == Some(source_canister)
}