use crate::{
InternalError, InternalErrorOrigin,
ids::IntentResourceKey,
ops::{
ic::{
IcOps,
call::{CallBuilder as OpsCallBuilder, CallOps, CallResult as OpsCallResult},
},
storage::intent::IntentStoreOps,
},
workflow::{prelude::*, runtime::intent::IntentCleanupWorkflow},
};
use candid::utils::{ArgumentDecoder, ArgumentEncoder};
use serde::de::DeserializeOwned;
use std::borrow::Cow;
pub struct CallWorkflow;
impl CallWorkflow {
#[must_use]
pub fn bounded_wait(canister_id: impl Into<Principal>, method: &str) -> CallBuilder<'static> {
CallBuilder {
inner: CallOps::bounded_wait(canister_id, method),
intent: None,
}
}
#[must_use]
pub fn unbounded_wait(canister_id: impl Into<Principal>, method: &str) -> CallBuilder<'static> {
CallBuilder {
inner: CallOps::unbounded_wait(canister_id, method),
intent: None,
}
}
}
pub struct IntentSpec {
key: IntentResourceKey,
quantity: u64,
ttl_secs: Option<u64>,
max_in_flight: Option<u64>,
}
impl IntentSpec {
pub const fn new(
key: IntentResourceKey,
quantity: u64,
ttl_secs: Option<u64>,
max_in_flight: Option<u64>,
) -> Self {
Self {
key,
quantity,
ttl_secs,
max_in_flight,
}
}
}
pub struct CallBuilder<'a> {
inner: OpsCallBuilder<'a>,
intent: Option<IntentSpec>,
}
impl CallBuilder<'_> {
pub fn with_arg<A>(self, arg: A) -> Result<Self, InternalError>
where
A: CandidType,
{
let Self { inner, intent } = self;
let inner = inner.with_arg(arg)?;
Ok(Self { inner, intent })
}
pub fn with_args<A>(self, args: A) -> Result<Self, InternalError>
where
A: ArgumentEncoder,
{
let Self { inner, intent } = self;
let inner = inner.with_args(args)?;
Ok(Self { inner, intent })
}
#[must_use]
pub fn with_raw_args<'b>(self, args: impl Into<Cow<'b, [u8]>>) -> CallBuilder<'b> {
let Self { inner, intent } = self;
let inner = inner.with_raw_args(args);
CallBuilder { inner, intent }
}
#[must_use]
pub fn with_cycles(self, cycles: u128) -> Self {
let Self { inner, intent } = self;
Self {
inner: inner.with_cycles(cycles),
intent,
}
}
#[must_use]
pub fn with_intent(mut self, intent: IntentSpec) -> Self {
self.intent = Some(intent);
self
}
pub async fn execute(self) -> Result<CallResult, InternalError> {
let Self { inner, intent } = self;
let now = IcOps::now_secs();
let Some(intent) = intent else {
return Ok(CallResult {
inner: inner.execute().await?,
});
};
let resource_key = IntentResourceKey::try_new(intent.key).map_err(|err| {
InternalError::invariant(
InternalErrorOrigin::Workflow,
format!("intent key invalid: {err}"),
)
})?;
IntentCleanupWorkflow::ensure_started();
if let Some(max_in_flight) = intent.max_in_flight {
let totals = IntentStoreOps::totals_at(&resource_key, now);
let in_flight = totals.reserved_qty;
let next = next_in_flight_quantity(in_flight, intent.quantity)?;
if next > max_in_flight {
return Err(InternalError::domain(
InternalErrorOrigin::Domain,
format!(
"intent capacity exceeded key={resource_key} in_flight={in_flight} \
requested={} max_in_flight={max_in_flight}",
intent.quantity
),
));
}
}
let intent_id = IntentStoreOps::allocate_intent_id()?;
let created_at = now;
let _ = IntentStoreOps::try_reserve(
intent_id,
resource_key.clone(),
intent.quantity,
created_at,
intent.ttl_secs,
now,
)?;
match inner.execute().await {
Ok(inner) => {
if let Err(err) = IntentStoreOps::commit_at(intent_id, now) {
crate::log!(
Error,
"intent commit failed id={intent_id} key={resource_key}: {err}"
);
}
Ok(CallResult { inner })
}
Err(call_err) => {
if let Err(abort_err) = IntentStoreOps::abort(intent_id) {
let message = format!("{call_err}; intent abort failed: {abort_err}");
return Err(InternalError::new(
call_err.class(),
call_err.origin(),
message,
));
}
Err(call_err)
}
}
}
}
fn next_in_flight_quantity(in_flight: u64, requested: u64) -> Result<u64, InternalError> {
in_flight.checked_add(requested).ok_or_else(|| {
InternalError::invariant(InternalErrorOrigin::Workflow, "intent reservation overflow")
})
}
pub struct CallResult {
inner: OpsCallResult,
}
impl CallResult {
pub fn candid<R>(&self) -> Result<R, InternalError>
where
R: CandidType + DeserializeOwned,
{
self.inner.candid()
}
pub fn candid_tuple<R>(&self) -> Result<R, InternalError>
where
R: for<'de> ArgumentDecoder<'de>,
{
self.inner.candid_tuple()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn next_in_flight_rejects_overflow() {
let err = next_in_flight_quantity(u64::MAX, 1).expect_err("overflow must fail");
assert!(err.to_string().contains("intent reservation overflow"));
}
}