hedera 0.9.0

The SDK for interacting with Hedera Hashgraph.
/*
 * ‌
 * Hedera Rust SDK
 * ​
 * Copyright (C) 2022 - 2023 Hedera Hashgraph, LLC
 * ​
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ‍
 */

use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::ExponentialBackoff;
use prost::Message;
use rand::thread_rng;
use tokio::time::sleep;
use tonic::transport::Channel;

use crate::{
    AccountId,
    Client,
    Error,
    LedgerId,
    Status,
    TransactionId,
};

#[async_trait]
pub(crate) trait Execute {
    type GrpcRequest: Clone + Message;

    type GrpcResponse: Message;

    /// Additional context returned from each call to `make_request`. Upon
    /// a successful request, the associated response context is passed to
    /// `make_response`.
    type Context: Send;

    type Response;

    /// Get the _explicit_ nodes that this request will be submitted to.
    fn node_account_ids(&self) -> Option<&[AccountId]>;

    /// Get the _explicit_ transaction ID that this request will use.
    fn transaction_id(&self) -> Option<TransactionId>;

    /// Get whether to generate transaction IDs for request creation.
    fn requires_transaction_id(&self) -> bool;

    /// Check whether to retry an pre-check status.
    fn should_retry_pre_check(&self, _status: Status) -> bool {
        false
    }

    /// Check whether we should retry an otherwise successful response.
    #[allow(unused_variables)]
    fn should_retry(&self, response: &Self::GrpcResponse) -> bool {
        false
    }

    /// Create a new request for execution.
    ///
    /// A created request is cached per node until any request returns
    /// `TransactionExpired`; in which case, the request cache is cleared.
    ///

    fn make_request(
        &self,
        transaction_id: &Option<TransactionId>,
        node_account_id: AccountId,
    ) -> crate::Result<(Self::GrpcRequest, Self::Context)>;

    /// Execute the created GRPC request against the provided GRPC channel.
    async fn execute(
        &self,
        channel: Channel,
        request: Self::GrpcRequest,
    ) -> Result<tonic::Response<Self::GrpcResponse>, tonic::Status>;

    /// Create a response from the GRPC response and the saved transaction
    /// and node account ID from the successful request.
    fn make_response(
        &self,
        response: Self::GrpcResponse,
        context: Self::Context,
        node_account_id: AccountId,
        transaction_id: Option<TransactionId>,
    ) -> crate::Result<Self::Response>;

    /// Create an error from the given pre-check status.
    fn make_error_pre_check(
        &self,
        status: Status,
        transaction_id: Option<TransactionId>,
    ) -> crate::Error;

    /// Extract the pre-check status from the GRPC response.
    fn response_pre_check_status(response: &Self::GrpcResponse) -> crate::Result<i32>;

    fn validate_checksums_for_ledger_id(&self, ledger_id: &LedgerId) -> Result<(), Error>;
}

pub(crate) async fn execute<E>(
    client: &Client,
    executable: &E,
    timeout: impl Into<Option<std::time::Duration>> + Send,
) -> crate::Result<E::Response>
where
    E: Execute + Sync,
{
    let timeout: Option<std::time::Duration> = timeout.into();

    let timeout = timeout.or_else(|| client.request_timeout()).unwrap_or_else(|| {
        std::time::Duration::from_millis(backoff::default::MAX_ELAPSED_TIME_MILLIS)
    });

    // the overall timeout for the backoff starts measuring from here
    let mut backoff =
        ExponentialBackoff { max_elapsed_time: Some(timeout), ..ExponentialBackoff::default() };
    let mut last_error: Option<Error> = None;

    if client.auto_validate_checksums() {
        if let Some(ledger_id) = &*client.ledger_id_internal() {
            executable.validate_checksums_for_ledger_id(ledger_id)?;
        } else {
            return Err(Error::CannotPerformTaskWithoutLedgerId { task: "validate checksums" });
        }
    }

    // TODO: cache requests to avoid signing a new request for every node in a delayed back-off

    // if we need to generate a transaction ID for this request (and one was not provided),
    // generate one now
    let explicit_transaction_id = executable.transaction_id();
    let mut transaction_id = match executable.requires_transaction_id() {
        false => None,
        true => match explicit_transaction_id {
            Some(id) => Some(id),
            None => client.generate_transaction_id().await,
        },
    };

    // if we were explicitly given a list of nodes to use, we iterate through each
    // of the given nodes (in a random order)

    let explicit_node_indexes = executable
        .node_account_ids()
        .map(|ids| client.network().node_indexes_for_ids(ids))
        .transpose()?;

    // the outer loop continues until we timeout or reach the maximum number of "attempts"
    // an attempt is counted when we have a successful response from a node that must either
    // be retried immediately (on a new node) or retried after a backoff.
    loop {
        // if no explicit set of node account IDs, we randomly sample 1/3 of all
        // healthy nodes on the client. this set of healthy nodes can change on
        // each iteration

        let healthy_node_indexes: Option<Vec<_>> = explicit_node_indexes
            .is_none()
            .then(|| client.network().healthy_node_indexes().collect());

        let node_indexes =
            explicit_node_indexes.as_deref().or(healthy_node_indexes.as_deref()).unwrap();

        let node_sample_amount = if explicit_node_indexes.is_none() {
            (node_indexes.len() + 2) / 3
        } else {
            node_indexes.len()
        };

        let node_index_indexes =
            rand::seq::index::sample(&mut thread_rng(), node_indexes.len(), node_sample_amount);

        for index in node_index_indexes.iter() {
            let node_index = node_indexes[index];
            let (node_account_id, channel) = client.network().channel(node_index);

            let (request, context) = executable.make_request(&transaction_id, node_account_id)?;

            let response = match executable.execute(channel, request).await {
                Ok(response) => response.into_inner(),
                Err(status) => {
                    match status.code() {
                        tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
                            // NOTE: this is an "unhealthy" node
                            client.network().mark_node_unhealthy(node_index);

                            // try the next node in our allowed list, immediately
                            last_error = Some(status.into());
                            continue;
                        }

                        _ => {
                            // fail immediately
                            return Err(status.into());
                        }
                    }
                }
            };

            let pre_check_status = E::response_pre_check_status(&response)?;

            match Status::from_i32(pre_check_status) {
                Some(status) => match status {
                    Status::Ok if executable.should_retry(&response) => {
                        last_error = Some(executable.make_error_pre_check(status, transaction_id));
                        break;
                    }

                    Status::Ok => {
                        return executable.make_response(
                            response,
                            context,
                            node_account_id,
                            transaction_id,
                        );
                    }

                    Status::Busy | Status::PlatformNotActive => {
                        // NOTE: this is a "busy" node
                        // try the next node in our allowed list, immediately
                        last_error = Some(executable.make_error_pre_check(status, transaction_id));
                        continue;
                    }

                    Status::TransactionExpired if explicit_transaction_id.is_none() => {
                        // the transaction that was generated has since expired
                        // re-generate the transaction ID and try again, immediately
                        last_error = Some(executable.make_error_pre_check(status, transaction_id));
                        transaction_id = client.generate_transaction_id().await;
                        continue;
                    }

                    _ if executable.should_retry_pre_check(status) => {
                        // conditional retry on pre-check should back-off and try again
                        last_error = Some(executable.make_error_pre_check(status, transaction_id));
                        break;
                    }

                    _ => {
                        // any other pre-check is an error that the user needs to fix, fail immediately
                        return Err(executable.make_error_pre_check(status, transaction_id));
                    }
                },

                None => {
                    // not sure how to proceed, fail immediately
                    return Err(Error::ResponseStatusUnrecognized(pre_check_status));
                }
            }
        }

        // we tried each node, suspend execution until the next backoff interval
        if let Some(duration) = backoff.next_backoff() {
            sleep(duration).await;
        } else {
            // maximum time allowed has elapsed
            // NOTE: it should be impossible to reach here without capturing at least one error
            return Err(Error::TimedOut(last_error.unwrap().into()));
        }
    }
}