use crate::api::{cost_call, msg_arg_data, msg_reject_code, msg_reject_msg};
use crate::{futures::is_recovering_from_trap, trap};
use candid::utils::{ArgumentDecoder, ArgumentEncoder, encode_args_ref};
use candid::{CandidType, Deserialize, Principal, decode_args, decode_one, encode_one};
use ic_cdk_executor::{MethodHandle, TaskHandle};
use std::borrow::Cow;
use std::future::IntoFuture;
use std::mem;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{Context, Poll, Waker};
use thiserror::Error;
pub use ic_error_types::RejectCode;
#[derive(Debug, Clone)]
pub struct Call<'m, 'a> {
canister_id: Principal,
method: &'m str,
cycles: u128,
timeout_seconds: Option<u32>,
encoded_args: Cow<'a, [u8]>,
}
impl<'m> Call<'m, '_> {
#[must_use]
pub fn bounded_wait(canister_id: Principal, method: &'m str) -> Self {
Self {
canister_id,
method,
cycles: 0,
timeout_seconds: Some(300),
encoded_args: Cow::Owned(vec![0x44, 0x49, 0x44, 0x4c, 0x00, 0x00]),
}
}
#[must_use]
pub fn unbounded_wait(canister_id: Principal, method: &'m str) -> Self {
Self {
canister_id,
method,
cycles: 0,
timeout_seconds: None,
encoded_args: Cow::Owned(vec![0x44, 0x49, 0x44, 0x4c, 0x00, 0x00]),
}
}
}
impl<'a> Call<'_, 'a> {
#[must_use]
pub fn with_arg<A: CandidType>(self, arg: A) -> Self {
Self {
encoded_args: Cow::Owned(encode_one(&arg).unwrap_or_else(panic_when_encode_fails)),
..self
}
}
#[must_use]
pub fn with_args<A: ArgumentEncoder>(self, args: &A) -> Self {
Self {
encoded_args: Cow::Owned(encode_args_ref(args).unwrap_or_else(panic_when_encode_fails)),
..self
}
}
#[must_use]
pub fn with_raw_args(self, raw_args: &'a [u8]) -> Self {
Self {
encoded_args: Cow::Borrowed(raw_args),
..self
}
}
#[must_use]
pub fn take_raw_args(self, raw_args: Vec<u8>) -> Self {
Self {
encoded_args: Cow::Owned(raw_args),
..self
}
}
#[must_use]
pub fn with_cycles(mut self, cycles: u128) -> Self {
self.cycles = cycles;
self
}
#[must_use]
pub fn change_timeout(mut self, timeout_seconds: u32) -> Self {
match self.timeout_seconds {
Some(_) => self.timeout_seconds = Some(timeout_seconds),
None => {
panic!("Cannot set a timeout for an instance created with Call::unbounded_wait")
}
}
self
}
#[must_use]
pub fn get_cost(&self) -> u128 {
self.cycles.saturating_add(cost_call(
self.method.len() as u64,
self.encoded_args.len() as u64,
))
}
}
#[derive(Debug)]
pub struct Response(Vec<u8>);
impl Response {
pub fn into_bytes(self) -> Vec<u8> {
self.0
}
pub fn candid<R>(&self) -> Result<R, CandidDecodeFailed>
where
R: CandidType + for<'de> Deserialize<'de>,
{
decode_one(&self.0).map_err(|e| CandidDecodeFailed {
type_name: std::any::type_name::<R>().to_string(),
candid_error: e.to_string(),
})
}
pub fn candid_tuple<R>(&self) -> Result<R, CandidDecodeFailed>
where
R: for<'de> ArgumentDecoder<'de>,
{
decode_args(&self.0).map_err(|e| CandidDecodeFailed {
type_name: std::any::type_name::<R>().to_string(),
candid_error: e.to_string(),
})
}
}
impl PartialEq<&[u8]> for Response {
fn eq(&self, other: &&[u8]) -> bool {
self.0 == *other
}
}
impl PartialEq<Vec<u8>> for Response {
fn eq(&self, other: &Vec<u8>) -> bool {
self.0 == *other
}
}
impl PartialEq for Response {
fn eq(&self, other: &Self) -> bool {
self.0 == other.0
}
}
impl std::ops::Deref for Response {
type Target = [u8];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl AsRef<[u8]> for Response {
fn as_ref(&self) -> &[u8] {
&self.0
}
}
impl std::borrow::Borrow<[u8]> for Response {
fn borrow(&self) -> &[u8] {
&self.0
}
}
#[derive(Error, Debug, Clone)]
pub enum Error {
#[error(transparent)]
InsufficientLiquidCycleBalance(#[from] InsufficientLiquidCycleBalance),
#[error(transparent)]
CallPerformFailed(#[from] CallPerformFailed),
#[error(transparent)]
CallRejected(#[from] CallRejected),
#[error(transparent)]
CandidDecodeFailed(#[from] CandidDecodeFailed),
}
#[derive(Error, Debug, Clone)]
pub enum CallFailed {
#[error(transparent)]
InsufficientLiquidCycleBalance(#[from] InsufficientLiquidCycleBalance),
#[error(transparent)]
CallPerformFailed(#[from] CallPerformFailed),
#[error(transparent)]
CallRejected(#[from] CallRejected),
}
#[derive(Error, Debug, Clone)]
pub enum OnewayError {
#[error(transparent)]
InsufficientLiquidCycleBalance(#[from] InsufficientLiquidCycleBalance),
#[error(transparent)]
CallPerformFailed(#[from] CallPerformFailed),
}
impl From<OnewayError> for Error {
fn from(e: OnewayError) -> Self {
match e {
OnewayError::InsufficientLiquidCycleBalance(e) => {
Error::InsufficientLiquidCycleBalance(e)
}
OnewayError::CallPerformFailed(e) => Error::CallPerformFailed(e),
}
}
}
impl From<CallFailed> for Error {
fn from(e: CallFailed) -> Self {
match e {
CallFailed::InsufficientLiquidCycleBalance(e) => {
Error::InsufficientLiquidCycleBalance(e)
}
CallFailed::CallPerformFailed(e) => Error::CallPerformFailed(e),
CallFailed::CallRejected(e) => Error::CallRejected(e),
}
}
}
#[derive(Error, Debug, Clone)]
#[error("insufficient liquid cycles balance, available: {available}, required: {required}")]
pub struct InsufficientLiquidCycleBalance {
pub available: u128,
pub required: u128,
}
#[derive(Error, Debug, Clone)]
#[error("call perform failed")]
pub struct CallPerformFailed;
#[derive(Error, Debug, Clone)]
#[error("call rejected: {raw_reject_code} - {reject_message}")]
pub struct CallRejected {
raw_reject_code: u32,
reject_message: String,
}
#[derive(Error, Debug, Clone, PartialEq, Eq)]
#[error("unrecognized reject code: {0}")]
pub struct UnrecognizedRejectCode(u32);
impl CallRejected {
pub fn with_rejection(raw_reject_code: u32, reject_message: String) -> Self {
Self {
raw_reject_code,
reject_message,
}
}
pub fn reject_code(&self) -> Result<RejectCode, UnrecognizedRejectCode> {
RejectCode::try_from(u64::from(self.raw_reject_code))
.map_err(|_| UnrecognizedRejectCode(self.raw_reject_code))
}
pub fn raw_reject_code(&self) -> u32 {
self.raw_reject_code
}
pub fn reject_message(&self) -> &str {
&self.reject_message
}
}
#[derive(Error, Debug, Clone)]
#[error("candid decode failed for type: {type_name}, candid error: {candid_error}")]
pub struct CandidDecodeFailed {
type_name: String,
candid_error: String,
}
pub trait CallErrorExt {
fn is_clean_reject(&self) -> bool;
fn is_immediately_retryable(&self) -> bool;
}
impl CallErrorExt for InsufficientLiquidCycleBalance {
fn is_clean_reject(&self) -> bool {
true
}
fn is_immediately_retryable(&self) -> bool {
false
}
}
impl CallErrorExt for CallPerformFailed {
fn is_clean_reject(&self) -> bool {
true
}
fn is_immediately_retryable(&self) -> bool {
false
}
}
impl CallErrorExt for CallRejected {
fn is_clean_reject(&self) -> bool {
let clean_reject_codes: Vec<u32> = vec![
RejectCode::SysFatal as u32,
RejectCode::SysTransient as u32,
RejectCode::DestinationInvalid as u32,
];
clean_reject_codes.contains(&self.raw_reject_code)
}
fn is_immediately_retryable(&self) -> bool {
let immediately_retryable_codes: Vec<u32> = vec![
RejectCode::SysTransient as u32,
RejectCode::SysUnknown as u32,
];
immediately_retryable_codes.contains(&self.raw_reject_code)
}
}
impl CallErrorExt for CandidDecodeFailed {
fn is_clean_reject(&self) -> bool {
false
}
fn is_immediately_retryable(&self) -> bool {
false
}
}
impl CallErrorExt for Error {
fn is_clean_reject(&self) -> bool {
match self {
Error::InsufficientLiquidCycleBalance(e) => e.is_clean_reject(),
Error::CallPerformFailed(e) => e.is_clean_reject(),
Error::CallRejected(e) => e.is_clean_reject(),
Error::CandidDecodeFailed(e) => e.is_clean_reject(),
}
}
fn is_immediately_retryable(&self) -> bool {
match self {
Error::InsufficientLiquidCycleBalance(e) => e.is_immediately_retryable(),
Error::CallPerformFailed(e) => e.is_immediately_retryable(),
Error::CallRejected(e) => e.is_immediately_retryable(),
Error::CandidDecodeFailed(e) => e.is_immediately_retryable(),
}
}
}
impl CallErrorExt for CallFailed {
fn is_clean_reject(&self) -> bool {
match self {
CallFailed::InsufficientLiquidCycleBalance(e) => e.is_clean_reject(),
CallFailed::CallPerformFailed(e) => e.is_clean_reject(),
CallFailed::CallRejected(e) => e.is_clean_reject(),
}
}
fn is_immediately_retryable(&self) -> bool {
match self {
CallFailed::InsufficientLiquidCycleBalance(e) => e.is_immediately_retryable(),
CallFailed::CallPerformFailed(e) => e.is_immediately_retryable(),
CallFailed::CallRejected(e) => e.is_immediately_retryable(),
}
}
}
impl CallErrorExt for OnewayError {
fn is_clean_reject(&self) -> bool {
match self {
OnewayError::InsufficientLiquidCycleBalance(e) => e.is_clean_reject(),
OnewayError::CallPerformFailed(e) => e.is_clean_reject(),
}
}
fn is_immediately_retryable(&self) -> bool {
match self {
OnewayError::InsufficientLiquidCycleBalance(e) => e.is_immediately_retryable(),
OnewayError::CallPerformFailed(e) => e.is_immediately_retryable(),
}
}
}
pub type CallResult<R> = Result<R, Error>;
impl<'m, 'a> IntoFuture for Call<'m, 'a> {
type Output = Result<Response, CallFailed>;
type IntoFuture = CallFuture<'m, 'a>;
fn into_future(self) -> Self::IntoFuture {
CallFuture {
state: Arc::new(RwLock::new(CallFutureState::Prepared { call: self })),
}
}
}
impl Call<'_, '_> {
pub fn oneway(&self) -> Result<(), OnewayError> {
self.check_liquid_cycle_balance_sufficient()?;
match self.perform(None) {
0 => Ok(()),
_ => Err(CallPerformFailed.into()),
}
}
fn check_liquid_cycle_balance_sufficient(&self) -> Result<(), InsufficientLiquidCycleBalance> {
let required = self.get_cost();
let available = crate::api::canister_liquid_cycle_balance();
if available >= required {
Ok(())
} else {
Err(InsufficientLiquidCycleBalance {
available,
required,
})
}
}
fn perform(&self, state_opt: Option<Arc<RwLock<CallFutureState<'_, '_>>>>) -> u32 {
let callee = self.canister_id.as_slice();
let method = self.method;
let arg = match &self.encoded_args {
Cow::Owned(vec) => vec,
Cow::Borrowed(r) => *r,
};
let state_ptr_opt = state_opt.map(Arc::<RwLock<CallFutureState<'_, '_>>>::into_raw);
match state_ptr_opt {
Some(state_ptr) => {
unsafe {
ic0::call_new(
callee,
method,
callback,
state_ptr as usize,
callback,
state_ptr as usize,
);
ic0::call_on_cleanup(cleanup, state_ptr as usize);
}
}
None => {
ic0::call_new_oneway(callee, method);
}
}
if !arg.is_empty() {
ic0::call_data_append(arg);
}
if self.cycles > 0 {
ic0::call_cycles_add128(self.cycles);
}
if let Some(timeout_seconds) = self.timeout_seconds {
ic0::call_with_best_effort_response(timeout_seconds);
}
let res = ic0::call_perform();
if res != 0
&& let Some(state_ptr) = state_ptr_opt
{
unsafe {
Arc::from_raw(state_ptr);
}
}
res
}
}
#[derive(Debug, Default)]
enum CallFutureState<'m, 'a> {
Prepared { call: Call<'m, 'a> },
Executing {
waker: Waker,
method: MethodHandle,
task: Option<TaskHandle>,
},
Complete {
result: Result<Response, CallFailed>,
},
#[default]
PostComplete,
Trapped,
}
#[derive(Debug)]
pub struct CallFuture<'m, 'a> {
state: Arc<RwLock<CallFutureState<'m, 'a>>>,
}
impl std::future::Future for CallFuture<'_, '_> {
type Output = Result<Response, CallFailed>;
fn poll(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
let self_ref = Pin::into_inner(self);
let mut state = self_ref.state.write().unwrap();
match mem::take(&mut *state) {
CallFutureState::Prepared { call } => {
if let Err(e) = call.check_liquid_cycle_balance_sufficient() {
*state = CallFutureState::PostComplete;
Poll::Ready(Err(e.into()))
} else {
match call.perform(Some(self_ref.state.clone())) {
0 => {
*state = CallFutureState::Executing {
waker: context.waker().clone(),
method: ic_cdk_executor::extend_current_method_context(),
task: TaskHandle::current(),
};
Poll::Pending
}
_ => {
*state = CallFutureState::PostComplete;
Poll::Ready(Err(CallPerformFailed.into()))
}
}
}
}
CallFutureState::Executing { method, task, .. } => {
*state = CallFutureState::Executing {
waker: context.waker().clone(),
method,
task,
};
Poll::Pending
}
CallFutureState::Complete { result } => {
*state = CallFutureState::PostComplete;
Poll::Ready(result)
}
CallFutureState::Trapped => trap("Call already trapped"),
CallFutureState::PostComplete => trap("CallFuture polled after completing"),
}
}
}
impl Drop for CallFuture<'_, '_> {
fn drop(&mut self) {
if is_recovering_from_trap() {
*self.state.write().unwrap() = CallFutureState::Trapped;
}
}
}
unsafe extern "C" fn callback(env: usize) {
let state_ptr = env as *const RwLock<CallFutureState<'_, '_>>;
let state = unsafe { Arc::from_raw(state_ptr) };
let completed_state = CallFutureState::Complete {
result: match msg_reject_code() {
0 => Ok(Response(msg_arg_data())),
code => {
Err(CallFailed::CallRejected(CallRejected {
raw_reject_code: code,
reject_message: msg_reject_msg(),
}))
}
},
};
let (waker, method) = match mem::replace(&mut *state.write().unwrap(), completed_state) {
CallFutureState::Executing { waker, method, .. } => (waker, method),
CallFutureState::Trapped => trap("Call already trapped"),
_ => {
unreachable!(
"CallFutureState for in-flight calls should only be Executing or Trapped (callback)"
)
}
};
ic_cdk_executor::in_callback_executor_context_for(method, || {
waker.wake();
});
}
unsafe extern "C" fn cleanup(env: usize) {
let state_ptr = env as *const RwLock<CallFutureState<'_, '_>>;
let state = unsafe { Arc::from_raw(state_ptr) };
let err_state = CallFutureState::Complete {
result: Err(CallFailed::CallRejected(CallRejected {
raw_reject_code: RejectCode::CanisterReject as u32,
reject_message: "cleanup".into(),
})),
};
let (method, task) = match mem::replace(&mut *state.write().unwrap(), err_state) {
CallFutureState::Executing { method, task, .. } => (method, task),
CallFutureState::Trapped => {
return;
}
_ => {
unreachable!(
"CallFutureState for in-flight calls should only be Executing or Trapped (cleanup)"
)
}
};
ic_cdk_executor::in_trap_recovery_context_for(method, || {
ic_cdk_executor::cancel_all_tasks_attached_to_current_method();
if let Some(task) = task {
ic_cdk_executor::cancel_task(&task);
}
});
}
fn panic_when_encode_fails(err: candid::error::Error) -> Vec<u8> {
panic!("failed to encode args: {err}")
}