mod error;
use std::any::type_name;
use std::borrow::Cow;
use std::ops::ControlFlow;
use std::time::{
Duration,
Instant,
};
use backoff::{
ExponentialBackoff,
ExponentialBackoffBuilder,
};
use futures_core::future::BoxFuture;
use futures_util::StreamExt;
use prost::Message;
use rand::seq::SliceRandom;
use rand::thread_rng;
use tonic::metadata::AsciiMetadataValue;
use tonic::transport::Channel;
use tonic::Request;
use triomphe::Arc;
use crate::client::NetworkData;
use crate::execute::error::is_tonic_status_transient;
use crate::ping_query::PingQuery;
use crate::{
client,
retry,
AccountId,
BoxGrpcFuture,
Client,
Error,
Status,
TransactionId,
ValidateChecksums,
};
pub(crate) trait Execute: ValidateChecksums {
type GrpcRequest: Clone + Message;
type GrpcResponse: Message;
type Context: Send;
type Response;
fn operator_account_id(&self) -> Option<&AccountId>;
fn node_account_ids(&self) -> Option<&[AccountId]>;
fn transaction_id(&self) -> Option<TransactionId>;
fn requires_transaction_id(&self) -> bool;
fn regenerate_transaction_id(&self) -> Option<bool> {
None
}
fn should_retry_pre_check(&self, _status: Status) -> bool {
false
}
#[allow(unused_variables)]
fn should_retry(&self, response: &Self::GrpcResponse) -> bool {
false
}
fn add_metadata(&self, metadata: &mut tonic::metadata::MetadataMap) {
let user_agent = format!("hiero-sdk-rust/{}", env!("CARGO_PKG_VERSION"));
metadata.insert("x-user-agent", user_agent.parse().unwrap());
}
fn make_request(
&self,
transaction_id: Option<&TransactionId>,
node_account_id: AccountId,
) -> crate::Result<(Self::GrpcRequest, Self::Context)>;
fn execute(
&self,
channel: Channel,
request: Self::GrpcRequest,
) -> BoxGrpcFuture<Self::GrpcResponse>;
fn make_response(
&self,
response: Self::GrpcResponse,
context: Self::Context,
node_account_id: AccountId,
transaction_id: Option<&TransactionId>,
) -> crate::Result<Self::Response>;
fn make_error_pre_check(
&self,
status: Status,
transaction_id: Option<&TransactionId>,
response: Self::GrpcResponse,
) -> crate::Error;
fn response_pre_check_status(response: &Self::GrpcResponse) -> crate::Result<i32>;
}
struct ExecuteContext {
operator_account_id: Option<AccountId>,
network: Arc<NetworkData>,
backoff_config: ExponentialBackoff,
max_attempts: usize,
grpc_timeout: Option<Duration>,
}
pub(crate) async fn execute<E>(
client: &Client,
executable: &E,
timeout: Option<Duration>,
) -> crate::Result<E::Response>
where
E: Execute + Sync,
{
if client.auto_validate_checksums() {
let ledger_id = client.ledger_id_internal();
let ledger_id = ledger_id
.as_ref()
.expect("Client had auto_validate_checksums enabled but no ledger ID");
executable.validate_checksums(ledger_id.as_ref_ledger_id())?;
}
let operator_account_id = 'op: {
if executable.transaction_id().is_some()
|| !executable
.regenerate_transaction_id()
.unwrap_or(client.default_regenerate_transaction_id())
{
break 'op None;
}
executable
.operator_account_id()
.copied()
.or_else(|| client.load_operator().as_ref().map(|it| it.account_id))
};
let backoff = client.backoff();
let mut backoff_builder = ExponentialBackoffBuilder::new();
backoff_builder
.with_initial_interval(backoff.initial_backoff)
.with_max_interval(backoff.max_backoff);
if let Some(timeout) = timeout.or(backoff.request_timeout) {
backoff_builder.with_max_elapsed_time(Some(timeout));
}
execute_inner(
&ExecuteContext {
max_attempts: backoff.max_attempts,
backoff_config: backoff_builder.build(),
operator_account_id,
network: client.net().0.load_full(),
grpc_timeout: backoff.grpc_timeout,
},
executable,
)
.await
}
async fn execute_inner<E>(ctx: &ExecuteContext, executable: &E) -> crate::Result<E::Response>
where
E: Execute + Sync,
{
fn recurse_ping(ctx: &ExecuteContext, index: usize) -> BoxFuture<'_, bool> {
Box::pin(async move {
let ctx = ExecuteContext {
operator_account_id: None,
network: Arc::clone(&ctx.network),
backoff_config: ctx.backoff_config.clone(),
max_attempts: ctx.max_attempts,
grpc_timeout: ctx.grpc_timeout,
};
let ping_query = PingQuery::new(ctx.network.node_ids()[index]);
execute_inner(&ctx, &ping_query).await.is_ok()
})
}
let backoff = ctx.backoff_config.clone();
let explicit_transaction_id = executable.transaction_id();
let mut transaction_id = executable
.requires_transaction_id()
.then_some(explicit_transaction_id)
.and_then(|it| it.or_else(|| ctx.operator_account_id.map(TransactionId::generate)));
let explicit_node_indexes = executable
.node_account_ids()
.map(|ids| ctx.network.node_indexes_for_ids(ids))
.transpose()?;
let explicit_node_indexes = explicit_node_indexes.as_deref();
let layer = move || async move {
loop {
let mut last_error: Option<Error> = None;
let random_node_indexes = random_node_indexes(&ctx.network, explicit_node_indexes)
.ok_or(retry::Error::EmptyTransient)?;
let random_node_indexes = {
let random_node_indexes = &random_node_indexes;
let client = ctx;
let now = Instant::now();
futures_util::stream::iter(random_node_indexes.iter().copied()).filter(
move |&node_index| async move {
explicit_node_indexes.is_some()
|| client.network.node_recently_pinged(node_index, now)
|| recurse_ping(client, node_index).await
},
)
};
let mut random_node_indexes = std::pin::pin!(random_node_indexes);
while let Some(node_index) = random_node_indexes.next().await {
let tmp = execute_single(ctx, executable, node_index, &mut transaction_id).await;
log::log!(
match &tmp {
Ok(ControlFlow::Break(_)) => log::Level::Debug,
Ok(ControlFlow::Continue(_)) => log::Level::Warn,
Err(e) =>
if e.is_transient() {
log::Level::Warn
} else {
log::Level::Error
},
},
"Execution of {} on node at index {node_index} / node id {} {}",
type_name::<E>(),
ctx.network.channel(node_index).0,
match &tmp {
Ok(ControlFlow::Break(_)) => Cow::Borrowed("succeeded"),
Ok(ControlFlow::Continue(err)) =>
format!("will continue due to {err:?}").into(),
Err(err) => format!("failed due to {err:?}").into(),
},
);
match tmp? {
ControlFlow::Continue(err) => last_error = Some(err),
ControlFlow::Break(res) => return Ok(res),
}
}
match last_error {
Some(it) => return Err(retry::Error::Transient(it)),
None => continue,
}
}
};
crate::retry(backoff, Some(ctx.max_attempts), layer).await
}
fn map_tonic_error(
status: tonic::Status,
network: &client::NetworkData,
node_index: usize,
request_free: bool,
) -> retry::Error {
const MIME_HTML: &[u8] = b"text/html";
match status.code() {
tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
network.mark_node_unhealthy(node_index);
retry::Error::Transient(status.into())
}
tonic::Code::Internal
if status.metadata().get("content-type").map(AsciiMetadataValue::as_bytes)
== Some(MIME_HTML) =>
{
network.mark_node_unhealthy(node_index);
match request_free {
true => retry::Error::Transient(status.into()),
false => retry::Error::Permanent(status.into()),
}
}
_ if is_tonic_status_transient(&status) => {
network.mark_node_unhealthy(node_index);
retry::Error::Transient(status.into())
}
_ => retry::Error::Permanent(status.into()),
}
}
async fn execute_single<E: Execute + Sync>(
ctx: &ExecuteContext,
executable: &E,
node_index: usize,
transaction_id: &mut Option<TransactionId>,
) -> retry::Result<ControlFlow<E::Response, Error>> {
let (node_account_id, channel) = ctx.network.channel(node_index);
log::debug!(
"Preparing {} on node at index {node_index} / node id {node_account_id}",
type_name::<E>()
);
let (request, context) = executable
.make_request(transaction_id.as_ref(), node_account_id)
.map_err(retry::Error::Permanent)?;
log::debug!(
"Executing {} on node at index {node_index} / node id {node_account_id}",
type_name::<E>()
);
let mut req = Request::new(request);
executable.add_metadata(req.metadata_mut());
let fut = executable.execute(channel, req.into_inner());
let response = match ctx.grpc_timeout {
Some(it) => match tokio::time::timeout(it, fut).await {
Ok(it) => it,
Err(_) => {
return Ok(ControlFlow::Continue(crate::Error::GrpcStatus(
tonic::Status::deadline_exceeded("explicitly given grpc timeout was exceeded"),
)))
}
},
None => fut.await,
};
let response = response.map(tonic::Response::into_inner).map_err(|status| {
map_tonic_error(status, &ctx.network, node_index, transaction_id.is_none())
});
let response = match response {
Ok(response) => response,
Err(retry::Error::Transient(err)) => {
return Ok(ControlFlow::Continue(err));
}
Err(e) => return Err(e),
};
ctx.network.mark_node_healthy(node_index);
let status = E::response_pre_check_status(&response)
.and_then(|status| {
Status::try_from(status).or_else(|_| Err(Error::ResponseStatusUnrecognized(status)))
})
.map_err(retry::Error::Permanent)?;
match status {
Status::Ok if executable.should_retry(&response) => Err(retry::Error::Transient(
executable.make_error_pre_check(status, transaction_id.as_ref(), response),
)),
Status::Ok => executable
.make_response(response, context, node_account_id, transaction_id.as_ref())
.map(ControlFlow::Break)
.map_err(retry::Error::Permanent),
Status::Busy | Status::PlatformNotActive => {
Ok(ControlFlow::Continue(executable.make_error_pre_check(
status,
transaction_id.as_ref(),
response,
)))
}
Status::TransactionExpired if ctx.operator_account_id.is_some() => {
let new = TransactionId::generate(ctx.operator_account_id.unwrap());
*transaction_id = Some(new);
Ok(ControlFlow::Continue(executable.make_error_pre_check(
status,
transaction_id.as_ref(),
response,
)))
}
_ if executable.should_retry_pre_check(status) => {
Err(retry::Error::Transient(executable.make_error_pre_check(
status,
transaction_id.as_ref(),
response,
)))
}
_ => {
Err(retry::Error::Permanent(executable.make_error_pre_check(
status,
transaction_id.as_ref(),
response,
)))
}
}
}
fn random_node_indexes(
network: &client::NetworkData,
explicit_node_indexes: Option<&[usize]>,
) -> Option<Vec<usize>> {
let mut rng = thread_rng();
let now = Instant::now();
if let Some(indexes) = explicit_node_indexes {
let tmp: Vec<_> =
indexes.iter().copied().filter(|index| network.is_node_healthy(*index, now)).collect();
let mut indexes = if tmp.is_empty() { indexes.to_vec() } else { tmp };
assert!(!indexes.is_empty(), "empty explicitly set nodes");
indexes.shuffle(&mut rng);
return Some(indexes);
}
{
let mut indexes: Vec<_> = network.healthy_node_indexes(now).collect();
if indexes.is_empty() {
return None;
}
let amount = (indexes.len() + 2) / 3;
let (shuffled, _) = indexes.partial_shuffle(&mut rng, amount);
Some(shuffled.to_vec())
}
}