1mod error;
3
4use std::any::type_name;
5use std::borrow::Cow;
6use std::ops::ControlFlow;
7use std::time::{
8 Duration,
9 Instant,
10};
11
12use backoff::{
13 ExponentialBackoff,
14 ExponentialBackoffBuilder,
15};
16use futures_core::future::BoxFuture;
17use futures_util::StreamExt;
18use prost::Message;
19use rand::seq::SliceRandom;
20use rand::thread_rng;
21use tonic::metadata::AsciiMetadataValue;
22use tonic::transport::Channel;
23use tonic::Request;
24use triomphe::Arc;
25
26use crate::client::NetworkData;
27use crate::execute::error::is_tonic_status_transient;
28use crate::ping_query::PingQuery;
29use crate::{
30 client,
31 retry,
32 AccountId,
33 BoxGrpcFuture,
34 Client,
35 Error,
36 Status,
37 TransactionId,
38 ValidateChecksums,
39};
40
41pub(crate) trait Execute: ValidateChecksums {
42 type GrpcRequest: Clone + Message;
43
44 type GrpcResponse: Message;
45
46 type Context: Send;
50
51 type Response;
52
53 fn operator_account_id(&self) -> Option<&AccountId>;
57
58 fn node_account_ids(&self) -> Option<&[AccountId]>;
60
61 fn transaction_id(&self) -> Option<TransactionId>;
63
64 fn requires_transaction_id(&self) -> bool;
66
67 fn regenerate_transaction_id(&self) -> Option<bool> {
71 None
72 }
73
74 fn grpc_deadline(&self) -> Option<std::time::Duration> {
78 None
79 }
80
81 fn request_timeout(&self) -> Option<std::time::Duration> {
85 None
86 }
87
88 fn should_retry_pre_check(&self, _status: Status) -> bool {
90 false
91 }
92
93 #[allow(unused_variables)]
95 fn should_retry(&self, response: &Self::GrpcResponse) -> bool {
96 false
97 }
98
99 fn add_metadata(&self, metadata: &mut tonic::metadata::MetadataMap) {
101 let user_agent = format!("hiero-sdk-rust/{}", env!("CARGO_PKG_VERSION"));
102 metadata.insert("x-user-agent", user_agent.parse().unwrap());
103 }
104
105 fn make_request(
110 &self,
111 transaction_id: Option<&TransactionId>,
112 node_account_id: AccountId,
113 ) -> crate::Result<(Self::GrpcRequest, Self::Context)>;
114
115 fn execute(
117 &self,
118 channel: Channel,
119 request: Self::GrpcRequest,
120 ) -> BoxGrpcFuture<Self::GrpcResponse>;
121
122 fn make_response(
125 &self,
126 response: Self::GrpcResponse,
127 context: Self::Context,
128 node_account_id: AccountId,
129 transaction_id: Option<&TransactionId>,
130 ) -> crate::Result<Self::Response>;
131
132 fn make_error_pre_check(
134 &self,
135 status: Status,
136 transaction_id: Option<&TransactionId>,
137 response: Self::GrpcResponse,
138 ) -> crate::Error;
139
140 fn response_pre_check_status(response: &Self::GrpcResponse) -> crate::Result<i32>;
142}
143
144struct ExecuteContext<'a> {
147 operator_account_id: Option<AccountId>,
149 network: Arc<NetworkData>,
150 backoff_config: ExponentialBackoff,
151 max_attempts: usize,
152 grpc_deadline: Duration,
154 client: &'a Client,
156}
157
158pub(crate) async fn execute<E>(
159 client: &Client,
160 executable: &E,
161 timeout: Option<Duration>,
162) -> crate::Result<E::Response>
163where
164 E: Execute + Sync,
165{
166 if client.auto_validate_checksums() {
167 let ledger_id = client.ledger_id_internal();
168 let ledger_id = ledger_id
169 .as_ref()
170 .expect("Client had auto_validate_checksums enabled but no ledger ID");
171
172 executable.validate_checksums(ledger_id.as_ref_ledger_id())?;
173 }
174
175 let operator_account_id = 'op: {
176 if executable.transaction_id().is_some()
177 || !executable
178 .regenerate_transaction_id()
179 .unwrap_or(client.default_regenerate_transaction_id())
180 {
181 break 'op None;
182 }
183
184 executable
185 .operator_account_id()
186 .copied()
187 .or_else(|| client.load_operator().as_ref().map(|it| it.account_id))
188 };
189
190 let backoff = client.backoff();
191 let mut backoff_builder = ExponentialBackoffBuilder::new();
192
193 backoff_builder
194 .with_initial_interval(backoff.initial_backoff)
195 .with_max_interval(backoff.max_backoff);
196
197 let request_timeout = executable
203 .request_timeout()
204 .or(timeout)
205 .or(backoff.request_timeout)
206 .unwrap_or(client::DEFAULT_REQUEST_TIMEOUT);
207 backoff_builder.with_max_elapsed_time(Some(request_timeout));
208
209 let grpc_deadline = executable.grpc_deadline().unwrap_or(backoff.grpc_deadline);
211
212 execute_inner(
213 &ExecuteContext {
214 max_attempts: backoff.max_attempts,
215 backoff_config: backoff_builder.build(),
216 operator_account_id,
217 network: client.net().0.load_full(),
218 grpc_deadline,
219 client,
220 },
221 executable,
222 )
223 .await
224}
225
226async fn execute_inner<'a, E>(
227 ctx: &ExecuteContext<'a>,
228 executable: &E,
229) -> crate::Result<E::Response>
230where
231 E: Execute + Sync,
232{
233 fn recurse_ping<'a, 'b: 'a>(ctx: &'b ExecuteContext<'a>, index: usize) -> BoxFuture<'b, bool> {
234 Box::pin(async move {
235 let ctx = ExecuteContext {
236 operator_account_id: None,
237 network: Arc::clone(&ctx.network),
238 backoff_config: ctx.backoff_config.clone(),
239 max_attempts: ctx.max_attempts,
240 client: ctx.client,
241 grpc_deadline: ctx.grpc_deadline,
242 };
243 let ping_query = PingQuery::new(ctx.network.node_ids()[index]);
244
245 execute_inner(&ctx, &ping_query).await.is_ok()
246 })
247 }
248
249 let backoff = ctx.backoff_config.clone();
251
252 let explicit_transaction_id = executable.transaction_id();
257 let mut transaction_id = executable
258 .requires_transaction_id()
259 .then_some(explicit_transaction_id)
260 .and_then(|it| it.or_else(|| ctx.operator_account_id.map(TransactionId::generate)));
261
262 let explicit_node_indexes = executable
265 .node_account_ids()
266 .map(|ids| ctx.network.node_indexes_for_ids(ids))
267 .transpose()?;
268
269 let explicit_node_indexes = explicit_node_indexes.as_deref();
270
271 let layer = move || async move {
272 loop {
273 let mut last_error: Option<Error> = None;
274
275 let random_node_indexes = random_node_indexes(&ctx.network, explicit_node_indexes)
276 .ok_or(retry::Error::EmptyTransient)?;
277
278 let random_node_indexes = {
279 let random_node_indexes = &random_node_indexes;
280 let client = ctx;
281 let now = Instant::now();
282 futures_util::stream::iter(random_node_indexes.iter().copied()).filter(
283 move |&node_index| async move {
284 explicit_node_indexes.is_some()
286 || client.network.node_recently_pinged(node_index, now)
287 || recurse_ping(client, node_index).await
288 },
289 )
290 };
291
292 let mut random_node_indexes = std::pin::pin!(random_node_indexes);
293
294 while let Some(node_index) = random_node_indexes.next().await {
295 let tmp = execute_single(ctx, executable, node_index, &mut transaction_id).await;
296
297 log::log!(
298 match &tmp {
299 Ok(ControlFlow::Break(_)) => log::Level::Debug,
300 Ok(ControlFlow::Continue(_)) => log::Level::Warn,
301 Err(e) =>
302 if e.is_transient() {
303 log::Level::Warn
304 } else {
305 log::Level::Error
306 },
307 },
308 "Execution of {} on node at index {node_index} / node id {} {}",
309 type_name::<E>(),
310 ctx.network.channel(node_index, ctx.grpc_deadline).0,
311 match &tmp {
312 Ok(ControlFlow::Break(_)) => Cow::Borrowed("succeeded"),
313 Ok(ControlFlow::Continue(err)) =>
314 format!("will continue due to {err:?}").into(),
315 Err(err) => format!("failed due to {err:?}").into(),
316 },
317 );
318
319 match tmp? {
320 ControlFlow::Continue(err) => last_error = Some(err),
321 ControlFlow::Break(res) => return Ok(res),
322 }
323 }
324
325 match last_error {
326 Some(it) => return Err(retry::Error::Transient(it)),
327 None => continue,
329 }
330 }
331 };
332
333 crate::retry(backoff, Some(ctx.max_attempts), layer).await
337}
338
339fn map_tonic_error(
340 status: tonic::Status,
341 network: &client::NetworkData,
342 node_index: usize,
343 request_free: bool,
344) -> retry::Error {
345 const MIME_HTML: &[u8] = b"text/html";
346
347 match status.code() {
348 tonic::Code::Unavailable | tonic::Code::ResourceExhausted => {
350 network.mark_node_unhealthy(node_index);
352
353 retry::Error::Transient(status.into())
355 }
356
357 tonic::Code::Internal
363 if status.metadata().get("content-type").map(AsciiMetadataValue::as_bytes)
364 == Some(MIME_HTML) =>
365 {
366 network.mark_node_unhealthy(node_index);
367
368 match request_free {
371 true => retry::Error::Transient(status.into()),
372 false => retry::Error::Permanent(status.into()),
373 }
374 }
375
376 _ if is_tonic_status_transient(&status) => {
377 network.mark_node_unhealthy(node_index);
378
379 retry::Error::Transient(status.into())
380 }
381
382 _ => retry::Error::Permanent(status.into()),
384 }
385}
386
387async fn execute_single<'a, E: Execute + Sync>(
388 ctx: &ExecuteContext<'a>,
389 executable: &E,
390 node_index: usize,
391 transaction_id: &mut Option<TransactionId>,
392) -> retry::Result<ControlFlow<E::Response, Error>> {
393 let (node_account_id, channel) = ctx.network.channel(node_index, ctx.grpc_deadline);
394
395 log::debug!(
396 "Preparing {} on node at index {node_index} / node id {node_account_id}",
397 type_name::<E>()
398 );
399
400 let (request, context) = executable
401 .make_request(transaction_id.as_ref(), node_account_id)
402 .map_err(retry::Error::Permanent)?;
404
405 log::debug!(
406 "Executing {} on node at index {node_index} / node id {node_account_id}",
407 type_name::<E>()
408 );
409
410 let mut req = Request::new(request);
411 executable.add_metadata(req.metadata_mut());
412
413 let fut = executable.execute(channel, req.into_inner());
414
415 let response = match tokio::time::timeout(ctx.grpc_deadline, fut).await {
416 Ok(it) => it,
417 Err(_) => {
418 return Ok(ControlFlow::Continue(crate::Error::GrpcStatus(
419 tonic::Status::deadline_exceeded("grpc deadline was exceeded"),
420 )))
421 }
422 };
423
424 let response = response.map(tonic::Response::into_inner).map_err(|status| {
425 map_tonic_error(status, &ctx.network, node_index, transaction_id.is_none())
426 });
427
428 let response = match response {
429 Ok(response) => response,
430 Err(retry::Error::Transient(err)) => {
431 return Ok(ControlFlow::Continue(err));
432 }
433
434 Err(e) => return Err(e),
435 };
436
437 ctx.network.mark_node_healthy(node_index);
439
440 let status = E::response_pre_check_status(&response)
441 .and_then(|status| {
442 Status::try_from(status).or_else(|_| Err(Error::ResponseStatusUnrecognized(status)))
444 })
445 .map_err(retry::Error::Permanent)?;
446
447 match status {
448 Status::Ok if executable.should_retry(&response) => Err(retry::Error::Transient(
449 executable.make_error_pre_check(status, transaction_id.as_ref(), response),
450 )),
451
452 Status::Ok => executable
453 .make_response(response, context, node_account_id, transaction_id.as_ref())
454 .map(ControlFlow::Break)
455 .map_err(retry::Error::Permanent),
456
457 Status::Busy | Status::PlatformNotActive => {
458 Ok(ControlFlow::Continue(executable.make_error_pre_check(
461 status,
462 transaction_id.as_ref(),
463 response,
464 )))
465 }
466
467 Status::TransactionExpired if ctx.operator_account_id.is_some() => {
469 let new = TransactionId::generate(ctx.operator_account_id.unwrap());
473
474 *transaction_id = Some(new);
475
476 Ok(ControlFlow::Continue(executable.make_error_pre_check(
477 status,
478 transaction_id.as_ref(),
479 response,
480 )))
481 }
482
483 Status::InvalidNodeAccount => {
484 ctx.network.mark_node_unhealthy(node_index);
488
489 log::warn!(
490 "Node at index {node_index} / node id {node_account_id} returned {status:?}, marking unhealthy. Updating address book before retry."
491 );
492
493 if !ctx.client.mirror_network().is_empty() {
495 ctx.client.refresh_network().await;
496 log::info!("Address book updated");
497 log::info!("network: {:?}", ctx.client.network());
498 } else {
499 log::warn!(
500 "Cannot update address book: no mirror network configured. Retrying with existing network configuration."
501 );
502 }
503
504 Err(retry::Error::Transient(executable.make_error_pre_check(
505 status,
506 transaction_id.as_ref(),
507 response,
508 )))
509 }
510
511 _ if executable.should_retry_pre_check(status) => {
512 Err(retry::Error::Transient(executable.make_error_pre_check(
514 status,
515 transaction_id.as_ref(),
516 response,
517 )))
518 }
519
520 _ => {
521 Err(retry::Error::Permanent(executable.make_error_pre_check(
523 status,
524 transaction_id.as_ref(),
525 response,
526 )))
527 }
528 }
529}
530
531fn random_node_indexes(
533 network: &client::NetworkData,
534 explicit_node_indexes: Option<&[usize]>,
535) -> Option<Vec<usize>> {
536 let mut rng = thread_rng();
539 let now = Instant::now();
540
541 if let Some(indexes) = explicit_node_indexes {
542 let tmp: Vec<_> =
543 indexes.iter().copied().filter(|index| network.is_node_healthy(*index, now)).collect();
544
545 let mut indexes = if tmp.is_empty() { indexes.to_vec() } else { tmp };
546
547 assert!(!indexes.is_empty(), "empty explicitly set nodes");
548
549 indexes.shuffle(&mut rng);
550
551 return Some(indexes);
552 }
553
554 {
555 let mut indexes: Vec<_> = network.healthy_node_indexes(now).collect();
556
557 if indexes.is_empty() {
558 return None;
559 }
560
561 let amount = (indexes.len() + 2) / 3;
563
564 let (shuffled, _) = indexes.partial_shuffle(&mut rng, amount);
565
566 Some(shuffled.to_vec())
567 }
568}