hedera/
execute.rs

1// SPDX-License-Identifier: Apache-2.0
2mod error;
3
4use std::any::type_name;
5use std::borrow::Cow;
6use std::ops::ControlFlow;
7use std::time::{
8    Duration,
9    Instant,
10};
11
12use backoff::{
13    ExponentialBackoff,
14    ExponentialBackoffBuilder,
15};
16use futures_core::future::BoxFuture;
17use futures_util::StreamExt;
18use prost::Message;
19use rand::seq::SliceRandom;
20use rand::thread_rng;
21use tonic::metadata::AsciiMetadataValue;
22use tonic::transport::Channel;
23use tonic::Request;
24use triomphe::Arc;
25
26use crate::client::NetworkData;
27use crate::execute::error::is_tonic_status_transient;
28use crate::ping_query::PingQuery;
29use crate::{
30    client,
31    retry,
32    AccountId,
33    BoxGrpcFuture,
34    Client,
35    Error,
36    Status,
37    TransactionId,
38    ValidateChecksums,
39};
40
41pub(crate) trait Execute: ValidateChecksums {
42    type GrpcRequest: Clone + Message;
43
44    type GrpcResponse: Message;
45
46    /// Additional context returned from each call to `make_request`. Upon
47    /// a successful request, the associated response context is passed to
48    /// `make_response`.
49    type Context: Send;
50
51    type Response;
52
53    /// Account ID to be used for generating transaction IDs.
54    ///
55    /// This is only used `self.requires_transaction` and `self.transaction_id.is_none()`.
56    fn operator_account_id(&self) -> Option<&AccountId>;
57
58    /// Get the _explicit_ nodes that this request will be submitted to.
59    fn node_account_ids(&self) -> Option<&[AccountId]>;
60
61    /// Get the _explicit_ transaction ID that this request will use.
62    fn transaction_id(&self) -> Option<TransactionId>;
63
64    /// Get whether to generate transaction IDs for request creation.
65    fn requires_transaction_id(&self) -> bool;
66
67    /// Returns whether to regenerate transaction IDs for request creation.
68    ///
69    /// Transaction ID regeneration only can happen when `transaction_id` is None and `requires_transaction_id` is true.
70    fn regenerate_transaction_id(&self) -> Option<bool> {
71        None
72    }
73
74    /// Returns the gRPC deadline for this request.
75    ///
76    /// If set, this overrides the client's default grpc_deadline.
77    fn grpc_deadline(&self) -> Option<std::time::Duration> {
78        None
79    }
80
81    /// Returns the request timeout for this request (including retries).
82    ///
83    /// If set, this takes priority over the timeout passed to execute methods and the client's request_timeout.
84    fn request_timeout(&self) -> Option<std::time::Duration> {
85        None
86    }
87
88    /// Check whether to retry an pre-check status.
89    fn should_retry_pre_check(&self, _status: Status) -> bool {
90        false
91    }
92
93    /// Check whether we should retry an otherwise successful response.
94    #[allow(unused_variables)]
95    fn should_retry(&self, response: &Self::GrpcResponse) -> bool {
96        false
97    }
98
99    /// Add metadata to the request.
100    fn add_metadata(&self, metadata: &mut tonic::metadata::MetadataMap) {
101        let user_agent = format!("hiero-sdk-rust/{}", env!("CARGO_PKG_VERSION"));
102        metadata.insert("x-user-agent", user_agent.parse().unwrap());
103    }
104
105    /// Create a new request for execution.
106    ///
107    /// A created request is cached per node until any request returns
108    /// `TransactionExpired`; in which case, the request cache is cleared.
109    fn make_request(
110        &self,
111        transaction_id: Option<&TransactionId>,
112        node_account_id: AccountId,
113    ) -> crate::Result<(Self::GrpcRequest, Self::Context)>;
114
115    /// Execute the created GRPC request against the provided GRPC channel.
116    fn execute(
117        &self,
118        channel: Channel,
119        request: Self::GrpcRequest,
120    ) -> BoxGrpcFuture<Self::GrpcResponse>;
121
122    /// Create a response from the GRPC response and the saved transaction
123    /// and node account ID from the successful request.
124    fn make_response(
125        &self,
126        response: Self::GrpcResponse,
127        context: Self::Context,
128        node_account_id: AccountId,
129        transaction_id: Option<&TransactionId>,
130    ) -> crate::Result<Self::Response>;
131
132    /// Create an error from the given pre-check status.
133    fn make_error_pre_check(
134        &self,
135        status: Status,
136        transaction_id: Option<&TransactionId>,
137        response: Self::GrpcResponse,
138    ) -> crate::Error;
139
140    /// Extract the pre-check status from the GRPC response.
141    fn response_pre_check_status(response: &Self::GrpcResponse) -> crate::Result<i32>;
142}
143
144/// The lifetime `'a` represents the lifetime of the borrowed `Client` reference.
145/// This ensures the context cannot outlive the client it references.
146struct ExecuteContext<'a> {
147    // When `Some` the `transaction_id` will be regenerated when expired.
148    operator_account_id: Option<AccountId>,
149    network: Arc<NetworkData>,
150    backoff_config: ExponentialBackoff,
151    max_attempts: usize,
152    // timeout for a single grpc request.
153    grpc_deadline: Duration,
154    // Reference to the client for triggering network updates
155    client: &'a Client,
156}
157
158pub(crate) async fn execute<E>(
159    client: &Client,
160    executable: &E,
161    timeout: Option<Duration>,
162) -> crate::Result<E::Response>
163where
164    E: Execute + Sync,
165{
166    if client.auto_validate_checksums() {
167        let ledger_id = client.ledger_id_internal();
168        let ledger_id = ledger_id
169            .as_ref()
170            .expect("Client had auto_validate_checksums enabled but no ledger ID");
171
172        executable.validate_checksums(ledger_id.as_ref_ledger_id())?;
173    }
174
175    let operator_account_id = 'op: {
176        if executable.transaction_id().is_some()
177            || !executable
178                .regenerate_transaction_id()
179                .unwrap_or(client.default_regenerate_transaction_id())
180        {
181            break 'op None;
182        }
183
184        executable
185            .operator_account_id()
186            .copied()
187            .or_else(|| client.load_operator().as_ref().map(|it| it.account_id))
188    };
189
190    let backoff = client.backoff();
191    let mut backoff_builder = ExponentialBackoffBuilder::new();
192
193    backoff_builder
194        .with_initial_interval(backoff.initial_backoff)
195        .with_max_interval(backoff.max_backoff);
196
197    // Timeout priority (matching JS SDK behavior):
198    // 1. Transaction's request_timeout field (if set)
199    // 2. Parameter timeout passed to execute
200    // 3. Client's request_timeout
201    // 4. Default DEFAULT_REQUEST_TIMEOUT
202    let request_timeout = executable
203        .request_timeout()
204        .or(timeout)
205        .or(backoff.request_timeout)
206        .unwrap_or(client::DEFAULT_REQUEST_TIMEOUT);
207    backoff_builder.with_max_elapsed_time(Some(request_timeout));
208
209    // Use transaction's grpc_deadline if set, otherwise use client's default
210    let grpc_deadline = executable.grpc_deadline().unwrap_or(backoff.grpc_deadline);
211
212    execute_inner(
213        &ExecuteContext {
214            max_attempts: backoff.max_attempts,
215            backoff_config: backoff_builder.build(),
216            operator_account_id,
217            network: client.net().0.load_full(),
218            grpc_deadline,
219            client,
220        },
221        executable,
222    )
223    .await
224}
225
226async fn execute_inner<'a, E>(
227    ctx: &ExecuteContext<'a>,
228    executable: &E,
229) -> crate::Result<E::Response>
230where
231    E: Execute + Sync,
232{
233    fn recurse_ping<'a, 'b: 'a>(ctx: &'b ExecuteContext<'a>, index: usize) -> BoxFuture<'b, bool> {
234        Box::pin(async move {
235            let ctx = ExecuteContext {
236                operator_account_id: None,
237                network: Arc::clone(&ctx.network),
238                backoff_config: ctx.backoff_config.clone(),
239                max_attempts: ctx.max_attempts,
240                client: ctx.client,
241                grpc_deadline: ctx.grpc_deadline,
242            };
243            let ping_query = PingQuery::new(ctx.network.node_ids()[index]);
244
245            execute_inner(&ctx, &ping_query).await.is_ok()
246        })
247    }
248
249    // the overall timeout for the backoff starts measuring from here
250    let backoff = ctx.backoff_config.clone();
251
252    // TODO: cache requests to avoid signing a new request for every node in a delayed back-off
253
254    // if we need to generate a transaction ID for this request (and one was not provided),
255    // generate one now
256    let explicit_transaction_id = executable.transaction_id();
257    let mut transaction_id = executable
258        .requires_transaction_id()
259        .then_some(explicit_transaction_id)
260        .and_then(|it| it.or_else(|| ctx.operator_account_id.map(TransactionId::generate)));
261
262    // if we were explicitly given a list of nodes to use, we iterate through each
263    // of the given nodes (in a random order)
264    let explicit_node_indexes = executable
265        .node_account_ids()
266        .map(|ids| ctx.network.node_indexes_for_ids(ids))
267        .transpose()?;
268
269    let explicit_node_indexes = explicit_node_indexes.as_deref();
270
271    let layer = move || async move {
272        loop {
273            let mut last_error: Option<Error> = None;
274
275            let random_node_indexes = random_node_indexes(&ctx.network, explicit_node_indexes)
276                .ok_or(retry::Error::EmptyTransient)?;
277
278            let random_node_indexes = {
279                let random_node_indexes = &random_node_indexes;
280                let client = ctx;
281                let now = Instant::now();
282                futures_util::stream::iter(random_node_indexes.iter().copied()).filter(
283                    move |&node_index| async move {
284                        // NOTE: For pings we're relying on the fact that they have an explict node index.
285                        explicit_node_indexes.is_some()
286                            || client.network.node_recently_pinged(node_index, now)
287                            || recurse_ping(client, node_index).await
288                    },
289                )
290            };
291
292            let mut random_node_indexes = std::pin::pin!(random_node_indexes);
293
294            while let Some(node_index) = random_node_indexes.next().await {
295                let tmp = execute_single(ctx, executable, node_index, &mut transaction_id).await;
296
297                log::log!(
298                    match &tmp {
299                        Ok(ControlFlow::Break(_)) => log::Level::Debug,
300                        Ok(ControlFlow::Continue(_)) => log::Level::Warn,
301                        Err(e) =>
302                            if e.is_transient() {
303                                log::Level::Warn
304                            } else {
305                                log::Level::Error
306                            },
307                    },
308                    "Execution of {} on node at index {node_index} / node id {} {}",
309                    type_name::<E>(),
310                    ctx.network.channel(node_index, ctx.grpc_deadline).0,
311                    match &tmp {
312                        Ok(ControlFlow::Break(_)) => Cow::Borrowed("succeeded"),
313                        Ok(ControlFlow::Continue(err)) =>
314                            format!("will continue due to {err:?}").into(),
315                        Err(err) => format!("failed due to {err:?}").into(),
316                    },
317                );
318
319                match tmp? {
320                    ControlFlow::Continue(err) => last_error = Some(err),
321                    ControlFlow::Break(res) => return Ok(res),
322                }
323            }
324
325            match last_error {
326                Some(it) => return Err(retry::Error::Transient(it)),
327                // this can only happen if we skipped every node due to pinging it coming up `false` (unhealthy)... The node will be marked as unhealthy, soo
328                None => continue,
329            }
330        }
331    };
332
333    // the outer loop continues until we timeout or reach the maximum number of "attempts"
334    // an attempt is counted when we have a successful response from a node that must either
335    // be retried immediately (on a new node) or retried after a backoff.
336    crate::retry(backoff, Some(ctx.max_attempts), layer).await
337}
338
339fn map_tonic_error(
340    status: tonic::Status,
341    network: &client::NetworkData,
342    node_index: usize,
343    request_free: bool,
344) -> retry::Error {
345    const MIME_HTML: &[u8] = b"text/html";
346
347    match status.code() {
348        // if the node says it isn't available, then we should just try again with a different node.
349        tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
350            // NOTE: this is an "unhealthy" node
351            network.mark_node_unhealthy(node_index);
352
353            // try the next node in our allowed list, immediately
354            retry::Error::Transient(status.into())
355        }
356
357        // todo: find a way to make this less fragile
358        // hack:
359        // if this happens:
360        // the node is completely borked (we're probably seeing the load balancer's response),
361        // and we have no clue if the effect went through
362        tonic::Code::Internal
363            if status.metadata().get("content-type").map(AsciiMetadataValue::as_bytes)
364                == Some(MIME_HTML) =>
365        {
366            network.mark_node_unhealthy(node_index);
367
368            // hack to the hack:
369            // if this is a free request let's try retrying it anyway...
370            match request_free {
371                true => retry::Error::Transient(status.into()),
372                false => retry::Error::Permanent(status.into()),
373            }
374        }
375
376        _ if is_tonic_status_transient(&status) => {
377            network.mark_node_unhealthy(node_index);
378
379            retry::Error::Transient(status.into())
380        }
381
382        // fail immediately
383        _ => retry::Error::Permanent(status.into()),
384    }
385}
386
387async fn execute_single<'a, E: Execute + Sync>(
388    ctx: &ExecuteContext<'a>,
389    executable: &E,
390    node_index: usize,
391    transaction_id: &mut Option<TransactionId>,
392) -> retry::Result<ControlFlow<E::Response, Error>> {
393    let (node_account_id, channel) = ctx.network.channel(node_index, ctx.grpc_deadline);
394
395    log::debug!(
396        "Preparing {} on node at index {node_index} / node id {node_account_id}",
397        type_name::<E>()
398    );
399
400    let (request, context) = executable
401        .make_request(transaction_id.as_ref(), node_account_id)
402        // Does not represent a network error or error returned by a node
403        .map_err(retry::Error::Permanent)?;
404
405    log::debug!(
406        "Executing {} on node at index {node_index} / node id {node_account_id}",
407        type_name::<E>()
408    );
409
410    let mut req = Request::new(request);
411    executable.add_metadata(req.metadata_mut());
412
413    let fut = executable.execute(channel, req.into_inner());
414
415    let response = match tokio::time::timeout(ctx.grpc_deadline, fut).await {
416        Ok(it) => it,
417        Err(_) => {
418            return Ok(ControlFlow::Continue(crate::Error::GrpcStatus(
419                tonic::Status::deadline_exceeded("grpc deadline was exceeded"),
420            )))
421        }
422    };
423
424    let response = response.map(tonic::Response::into_inner).map_err(|status| {
425        map_tonic_error(status, &ctx.network, node_index, transaction_id.is_none())
426    });
427
428    let response = match response {
429        Ok(response) => response,
430        Err(retry::Error::Transient(err)) => {
431            return Ok(ControlFlow::Continue(err));
432        }
433
434        Err(e) => return Err(e),
435    };
436
437    // at this point, any failure isn't from the node, it's from the request.
438    ctx.network.mark_node_healthy(node_index);
439
440    let status = E::response_pre_check_status(&response)
441        .and_then(|status| {
442            // not sure how to proceed, fail immediately
443            Status::try_from(status).or_else(|_| Err(Error::ResponseStatusUnrecognized(status)))
444        })
445        .map_err(retry::Error::Permanent)?;
446
447    match status {
448        Status::Ok if executable.should_retry(&response) => Err(retry::Error::Transient(
449            executable.make_error_pre_check(status, transaction_id.as_ref(), response),
450        )),
451
452        Status::Ok => executable
453            .make_response(response, context, node_account_id, transaction_id.as_ref())
454            .map(ControlFlow::Break)
455            .map_err(retry::Error::Permanent),
456
457        Status::Busy | Status::PlatformNotActive => {
458            // NOTE: this is a "busy" node
459            // try the next node in our allowed list, immediately
460            Ok(ControlFlow::Continue(executable.make_error_pre_check(
461                status,
462                transaction_id.as_ref(),
463                response,
464            )))
465        }
466
467        // would do an `if_let` but, not stable ._.
468        Status::TransactionExpired if ctx.operator_account_id.is_some() => {
469            // the transaction that was generated has since expired
470            // re-generate the transaction ID and try again, immediately
471
472            let new = TransactionId::generate(ctx.operator_account_id.unwrap());
473
474            *transaction_id = Some(new);
475
476            Ok(ControlFlow::Continue(executable.make_error_pre_check(
477                status,
478                transaction_id.as_ref(),
479                response,
480            )))
481        }
482
483        Status::InvalidNodeAccount => {
484            // The node account is invalid or doesn't match the submitted node
485            // Mark the node as unhealthy and retry with backoff
486            // This typically indicates the address book is out of date
487            ctx.network.mark_node_unhealthy(node_index);
488
489            log::warn!(
490                "Node at index {node_index} / node id {node_account_id} returned {status:?}, marking unhealthy. Updating address book before retry."
491            );
492
493            // Update the network address book before retrying, but only if mirror network is configured
494            if !ctx.client.mirror_network().is_empty() {
495                ctx.client.refresh_network().await;
496                log::info!("Address book updated");
497                log::info!("network: {:?}", ctx.client.network());
498            } else {
499                log::warn!(
500                    "Cannot update address book: no mirror network configured. Retrying with existing network configuration."
501                );
502            }
503
504            Err(retry::Error::Transient(executable.make_error_pre_check(
505                status,
506                transaction_id.as_ref(),
507                response,
508            )))
509        }
510
511        _ if executable.should_retry_pre_check(status) => {
512            // conditional retry on pre-check should back-off and try again
513            Err(retry::Error::Transient(executable.make_error_pre_check(
514                status,
515                transaction_id.as_ref(),
516                response,
517            )))
518        }
519
520        _ => {
521            // any other pre-check is an error that the user needs to fix, fail immediately
522            Err(retry::Error::Permanent(executable.make_error_pre_check(
523                status,
524                transaction_id.as_ref(),
525                response,
526            )))
527        }
528    }
529}
530
531// todo: return an iterator.
532fn random_node_indexes(
533    network: &client::NetworkData,
534    explicit_node_indexes: Option<&[usize]>,
535) -> Option<Vec<usize>> {
536    // cache the rng impl and "now" because `thread_rng` is TLS (a thread local),
537    // and because using the same reference time avoids situations where a node that wasn't available becomes available.
538    let mut rng = thread_rng();
539    let now = Instant::now();
540
541    if let Some(indexes) = explicit_node_indexes {
542        let tmp: Vec<_> =
543            indexes.iter().copied().filter(|index| network.is_node_healthy(*index, now)).collect();
544
545        let mut indexes = if tmp.is_empty() { indexes.to_vec() } else { tmp };
546
547        assert!(!indexes.is_empty(), "empty explicitly set nodes");
548
549        indexes.shuffle(&mut rng);
550
551        return Some(indexes);
552    }
553
554    {
555        let mut indexes: Vec<_> = network.healthy_node_indexes(now).collect();
556
557        if indexes.is_empty() {
558            return None;
559        }
560
561        // would put this inline, but borrowck wouldn't allow that.
562        let amount = (indexes.len() + 2) / 3;
563
564        let (shuffled, _) = indexes.partial_shuffle(&mut rng, amount);
565
566        Some(shuffled.to_vec())
567    }
568}