canic-core 0.26.7

Canic — a canister orchestration and management toolkit for the Internet Computer
Documentation
use crate::{
    InternalError, InternalErrorOrigin,
    ids::IntentResourceKey,
    ops::{
        ic::{
            IcOps,
            call::{CallBuilder as OpsCallBuilder, CallOps, CallResult as OpsCallResult},
        },
        storage::intent::IntentStoreOps,
    },
    workflow::{prelude::*, runtime::intent::IntentCleanupWorkflow},
};
use candid::utils::{ArgumentDecoder, ArgumentEncoder};
use serde::de::DeserializeOwned;
use std::borrow::Cow;

///
/// CallWorkflow
///

pub struct CallWorkflow;

impl CallWorkflow {
    #[must_use]
    pub fn bounded_wait(canister_id: impl Into<Principal>, method: &str) -> CallBuilder<'static> {
        CallBuilder {
            inner: CallOps::bounded_wait(canister_id, method),
            intent: None,
        }
    }

    #[must_use]
    pub fn unbounded_wait(canister_id: impl Into<Principal>, method: &str) -> CallBuilder<'static> {
        CallBuilder {
            inner: CallOps::unbounded_wait(canister_id, method),
            intent: None,
        }
    }
}

///
/// IntentSpec
/// Internal intent spec for call orchestration.
///

pub struct IntentSpec {
    key: IntentResourceKey,
    quantity: u64,
    ttl_secs: Option<u64>,
    max_in_flight: Option<u64>,
}

impl IntentSpec {
    pub const fn new(
        key: IntentResourceKey,
        quantity: u64,
        ttl_secs: Option<u64>,
        max_in_flight: Option<u64>,
    ) -> Self {
        Self {
            key,
            quantity,
            ttl_secs,
            max_in_flight,
        }
    }
}

///
/// CallBuilder (workflow)
///

pub struct CallBuilder<'a> {
    inner: OpsCallBuilder<'a>,
    intent: Option<IntentSpec>,
}

impl CallBuilder<'_> {
    // ---------- arguments ----------

    /// Encode a single argument into Candid bytes (fallible).
    pub fn with_arg<A>(self, arg: A) -> Result<Self, InternalError>
    where
        A: CandidType,
    {
        let Self { inner, intent } = self;
        let inner = inner.with_arg(arg)?;

        Ok(Self { inner, intent })
    }

    /// Encode multiple arguments into Candid bytes (fallible).
    pub fn with_args<A>(self, args: A) -> Result<Self, InternalError>
    where
        A: ArgumentEncoder,
    {
        let Self { inner, intent } = self;
        let inner = inner.with_args(args)?;

        Ok(Self { inner, intent })
    }

    /// Use pre-encoded Candid arguments (no validation performed).
    #[must_use]
    pub fn with_raw_args<'b>(self, args: impl Into<Cow<'b, [u8]>>) -> CallBuilder<'b> {
        let Self { inner, intent } = self;
        let inner = inner.with_raw_args(args);

        CallBuilder { inner, intent }
    }

    // ---------- cycles ----------

    #[must_use]
    pub fn with_cycles(self, cycles: u128) -> Self {
        let Self { inner, intent } = self;

        Self {
            inner: inner.with_cycles(cycles),
            intent,
        }
    }

    // ---------- intent ----------

    #[must_use]
    pub fn with_intent(mut self, intent: IntentSpec) -> Self {
        self.intent = Some(intent);
        self
    }

    // ---------- execution ----------

    pub async fn execute(self) -> Result<CallResult, InternalError> {
        // Intent semantics:
        // - reserve before executing the call
        // - commit on success; commit errors are logged, call result still returned
        // - abort on failure; abort errors are attached to the call error
        let Self { inner, intent } = self;
        let now = IcOps::now_secs();

        let Some(intent) = intent else {
            return Ok(CallResult {
                inner: inner.execute().await?,
            });
        };

        let resource_key = IntentResourceKey::try_new(intent.key).map_err(|err| {
            InternalError::invariant(
                InternalErrorOrigin::Workflow,
                format!("intent key invalid: {err}"),
            )
        })?;

        IntentCleanupWorkflow::ensure_started();

        if let Some(max_in_flight) = intent.max_in_flight {
            let totals = IntentStoreOps::totals_at(&resource_key, now);
            let in_flight = totals.reserved_qty;
            let next = next_in_flight_quantity(in_flight, intent.quantity)?;

            if next > max_in_flight {
                return Err(InternalError::domain(
                    InternalErrorOrigin::Domain,
                    format!(
                        "intent capacity exceeded key={resource_key} in_flight={in_flight} \
requested={} max_in_flight={max_in_flight}",
                        intent.quantity
                    ),
                ));
            }
        }

        let intent_id = IntentStoreOps::allocate_intent_id()?;
        let created_at = now;
        let _ = IntentStoreOps::try_reserve(
            intent_id,
            resource_key.clone(),
            intent.quantity,
            created_at,
            intent.ttl_secs,
            now,
        )?;

        match inner.execute().await {
            Ok(inner) => {
                if let Err(err) = IntentStoreOps::commit_at(intent_id, now) {
                    crate::log!(
                        Error,
                        "intent commit failed id={intent_id} key={resource_key}: {err}"
                    );
                }

                Ok(CallResult { inner })
            }
            Err(call_err) => {
                if let Err(abort_err) = IntentStoreOps::abort(intent_id) {
                    let message = format!("{call_err}; intent abort failed: {abort_err}");
                    return Err(InternalError::new(
                        call_err.class(),
                        call_err.origin(),
                        message,
                    ));
                }

                Err(call_err)
            }
        }
    }
}

// Compute the next in-flight quantity after applying a reservation request.
// Returns an invariant error if arithmetic overflows.
fn next_in_flight_quantity(in_flight: u64, requested: u64) -> Result<u64, InternalError> {
    in_flight.checked_add(requested).ok_or_else(|| {
        InternalError::invariant(InternalErrorOrigin::Workflow, "intent reservation overflow")
    })
}

///
/// CallResult (workflow)
///

pub struct CallResult {
    inner: OpsCallResult,
}

impl CallResult {
    pub fn candid<R>(&self) -> Result<R, InternalError>
    where
        R: CandidType + DeserializeOwned,
    {
        self.inner.candid()
    }

    pub fn candid_tuple<R>(&self) -> Result<R, InternalError>
    where
        R: for<'de> ArgumentDecoder<'de>,
    {
        self.inner.candid_tuple()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    // Guard against arithmetic wraparound when adding a new reservation.
    fn next_in_flight_rejects_overflow() {
        let err = next_in_flight_quantity(u64::MAX, 1).expect_err("overflow must fail");
        assert!(err.to_string().contains("intent reservation overflow"));
    }
}