casper_node/
effect.rs

1//! Effects subsystem.
2//!
3//! Effects describe things that the creator of the effect intends to happen, producing a value upon
4//! completion (they actually are boxed futures).
5//!
6//! A pinned, boxed future returning an event is called an effect and typed as an `Effect<Ev>`,
7//! where `Ev` is the event's type, as every effect must have its return value either wrapped in an
8//! event through [`EffectExt::event`](EffectExt::event) or ignored using
9//! [`EffectExt::ignore`](EffectExt::ignore). As an example, the
10//! [`handle_event`](crate::components::Component::handle_event) function of a component always
11//! returns `Effect<Self::Event>`.
12//!
13//! # A primer on events
14//!
15//! There are three distinct groups of events found around the node:
16//!
17//! * (unbound) events: These events are not associated with a particular reactor or component and
18//!   represent information or requests by themselves. An example is the
19//!   [`PeerBehaviorAnnouncement`](`crate::effect::announcements::PeerBehaviorAnnouncement`), it can
20//!   be emitted through an effect by different components and contains the ID of a peer that should
21//!   be shunned. It is not associated with a particular reactor or component though.
22//!
23//!   While the node is running, these unbound events cannot exist on their own, instead they are
24//!   typically converted into a concrete reactor event by the effect builder as soon as they are
25//!   created.
26//!
27//! * reactor events: A running reactor has a single event type that encompasses all possible
28//!   unbound events that can occur during its operation and all component events of components it
29//!   is made of. Usually they are implemented as one large `enum` with only newtype-variants.
30//!
31//! * component events: Every component defines its own set of events, typically for internal use.
32//!   If the component is able to process unbound events like announcements or requests, it will
33//!   have a `From` implementation that allows converting them into a suitable component event.
34//!
35//!   Component events are also created from the return values of effects: While effects do not
36//!   return events themselves when called, their return values are turned first into component
37//!   events through the [`event`](EffectExt) method. In a second step, inside the
38//!   reactors routing code, `wrap_effect` will then convert from component to reactor event.
39//!
40//! # Using effects
41//!
42//! To create an effect, an `EffectBuilder` will be passed in by the calling reactor runner. For
43//! example, given an effect builder `effect_builder`, we can create a `set_timeout` future and turn
44//! it into an effect:
45//!
46//! ```ignore
47//! use std::time::Duration;
48//! use casper_node::effect::EffectExt;
49//!
50//! // Note: This is our "component" event.
51//! enum Event {
52//!     ThreeSecondsElapsed(Duration)
53//! }
54//!
55//! effect_builder
56//!     .set_timeout(Duration::from_secs(3))
57//!     .event(Event::ThreeSecondsElapsed);
58//! ```
59//!
60//! This example will produce an effect that, after three seconds, creates an
61//! `Event::ThreeSecondsElapsed`. Note that effects do nothing on their own, they need to be passed
62//! to a [`reactor`](../reactor/index.html) to be executed.
63//!
64//! # Arbitrary effects
65//!
66//! While it is technically possible to turn any future into an effect, it is in general advisable
67//! to only use the methods on [`EffectBuilder`] or short, anonymous futures to create effects.
68//!
69//! # Announcements and requests
70//!
71//! Events are usually classified into either announcements or requests, although these properties
72//! are not reflected in the type system.
73//!
74//! **Announcements** are events that are essentially "fire-and-forget"; the component that created
75//! the effect resulting in the creation of the announcement will never expect an "answer".
76//! Announcements are often dispatched to multiple components by the reactor; since that usually
77//! involves a [`clone`](`Clone::clone`), they should be kept light.
78//!
79//! A good example is the arrival of a new transaction passed in by a client. Depending on the setup
80//! it may be stored, buffered or, in certain testing setups, just discarded. None of this is a
81//! concern of the component that talks to the client and deserializes the incoming transaction
82//! though, instead it simply returns an effect that produces an announcement.
83//!
84//! **Requests** are complex events that are used when a component needs something from other
85//! components. Typically, an effect (which uses [`EffectBuilder::make_request`] in its
86//! implementation) is called resulting in the actual request being scheduled and handled. In
87//! contrast to announcements, requests must always be handled by exactly one component.
88//!
89//! Every request has a [`Responder`]-typed field, which a handler of a request calls to produce
90//! another effect that will send the return value to the original requesting component. Failing to
91//! call the [`Responder::respond`] function will result in a runtime warning.
92
93pub(crate) mod announcements;
94pub(crate) mod diagnostics_port;
95pub(crate) mod incoming;
96pub(crate) mod requests;
97
98use std::{
99    any::type_name,
100    borrow::Cow,
101    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
102    fmt::{self, Debug, Display, Formatter},
103    future::Future,
104    mem,
105    sync::Arc,
106    time::{Duration, Instant},
107};
108
109use datasize::DataSize;
110use futures::{channel::oneshot, future::BoxFuture, FutureExt};
111use once_cell::sync::Lazy;
112use serde::{Serialize, Serializer};
113use smallvec::{smallvec, SmallVec};
114use tokio::{sync::Semaphore, time};
115use tracing::{debug, error, warn};
116
117use casper_binary_port::{
118    ConsensusStatus, ConsensusValidatorChanges, LastProgress, NetworkName, RecordId, Uptime,
119};
120use casper_storage::{
121    block_store::types::ApprovalsHashes,
122    data_access_layer::{
123        prefixed_values::{PrefixedValuesRequest, PrefixedValuesResult},
124        tagged_values::{TaggedValuesRequest, TaggedValuesResult},
125        AddressableEntityResult, BalanceRequest, BalanceResult, EraValidatorsRequest,
126        EraValidatorsResult, ExecutionResultsChecksumResult, PutTrieRequest, PutTrieResult,
127        QueryRequest, QueryResult, SeigniorageRecipientsRequest, SeigniorageRecipientsResult,
128        TrieRequest, TrieResult,
129    },
130    DbRawBytesSpec,
131};
132use casper_types::{
133    execution::{Effects as ExecutionEffects, ExecutionResult},
134    Approval, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockSignatures,
135    BlockSynchronizerStatus, BlockV2, ChainspecRawBytes, DeployHash, Digest, EntityAddr, EraId,
136    ExecutionInfo, FinalitySignature, FinalitySignatureId, FinalitySignatureV2, HashAddr, Key,
137    NextUpgrade, Package, PackageAddr, ProtocolUpgradeConfig, PublicKey, TimeDiff, Timestamp,
138    Transaction, TransactionHash, TransactionId, Transfer, U512,
139};
140
141use crate::{
142    components::{
143        block_synchronizer::{
144            GlobalStateSynchronizerError, GlobalStateSynchronizerResponse, TrieAccumulatorError,
145            TrieAccumulatorResponse,
146        },
147        consensus::{ClContext, EraDump, ProposedBlock},
148        contract_runtime::SpeculativeExecutionResult,
149        diagnostics_port::StopAtSpec,
150        fetcher::{FetchItem, FetchResult},
151        gossiper::GossipItem,
152        network::{blocklist::BlocklistJustification, FromIncoming, NetworkInsights},
153        transaction_acceptor,
154    },
155    contract_runtime::ExecutionPreState,
156    failpoints::FailpointActivation,
157    reactor::{main_reactor::ReactorState, EventQueueHandle, QueueKind},
158    types::{
159        appendable_block::AppendableBlock, BlockExecutionResultsOrChunk,
160        BlockExecutionResultsOrChunkId, BlockWithMetadata, ExecutableBlock, FinalizedBlock,
161        InvalidProposalError, LegacyDeploy, MetaBlock, MetaBlockState, NodeId, TransactionHeader,
162    },
163    utils::{fmt_limit::FmtLimit, SharedFlag, Source},
164};
165use announcements::{
166    BlockAccumulatorAnnouncement, ConsensusAnnouncement, ContractRuntimeAnnouncement,
167    ControlAnnouncement, FatalAnnouncement, FetchedNewBlockAnnouncement,
168    FetchedNewFinalitySignatureAnnouncement, GossiperAnnouncement, MetaBlockAnnouncement,
169    PeerBehaviorAnnouncement, QueueDumpFormat, TransactionAcceptorAnnouncement,
170    TransactionBufferAnnouncement, UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement,
171};
172use casper_storage::data_access_layer::EntryPointExistsResult;
173use diagnostics_port::DumpConsensusStateRequest;
174use requests::{
175    AcceptTransactionRequest, BeginGossipRequest, BlockAccumulatorRequest,
176    BlockSynchronizerRequest, BlockValidationRequest, ChainspecRawBytesRequest, ConsensusRequest,
177    ContractRuntimeRequest, FetcherRequest, MakeBlockExecutableRequest, MarkBlockCompletedRequest,
178    MetricsRequest, NetworkInfoRequest, NetworkRequest, ReactorInfoRequest, SetNodeStopRequest,
179    StorageRequest, SyncGlobalStateRequest, TransactionBufferRequest, TrieAccumulatorRequest,
180    UpgradeWatcherRequest,
181};
182
183/// A resource that will never be available, thus trying to acquire it will wait forever.
184static UNOBTAINABLE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(0));
185
186/// A pinned, boxed future that produces one or more events.
187pub(crate) type Effect<Ev> = BoxFuture<'static, Multiple<Ev>>;
188
189/// Multiple effects in a container.
190pub(crate) type Effects<Ev> = Multiple<Effect<Ev>>;
191
192/// A small collection of rarely more than two items.
193///
194/// Stored in a `SmallVec` to avoid allocations in case there are less than three items grouped. The
195/// size of two items is chosen because one item is the most common use case, and large items are
196/// typically boxed. In the latter case two pointers and one enum variant discriminator is almost
197/// the same size as an empty vec, which is two pointers.
198pub(crate) type Multiple<T> = SmallVec<[T; 2]>;
199
200/// The type of peers that should receive the gossip message.
201#[derive(Debug, Serialize, PartialEq, Eq, Hash, Copy, Clone, DataSize)]
202pub(crate) enum GossipTarget {
203    /// Both validators and non validators.
204    Mixed(EraId),
205    /// All peers.
206    All,
207}
208
209impl Display for GossipTarget {
210    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
211        match self {
212            GossipTarget::Mixed(era_id) => write!(formatter, "gossip target mixed for {}", era_id),
213            GossipTarget::All => write!(formatter, "gossip target all"),
214        }
215    }
216}
217
218/// A responder satisfying a request.
219#[must_use]
220#[derive(DataSize)]
221pub(crate) struct Responder<T> {
222    /// Sender through which the response ultimately should be sent.
223    sender: Option<oneshot::Sender<T>>,
224    /// Reactor flag indicating shutdown.
225    is_shutting_down: SharedFlag,
226}
227
228/// A responder that will automatically send a `None` on drop.
229#[must_use]
230#[derive(DataSize, Debug)]
231pub(crate) struct AutoClosingResponder<T>(Responder<Option<T>>);
232
233impl<T> AutoClosingResponder<T> {
234    /// Creates a new auto closing responder from a responder of `Option<T>`.
235    pub(crate) fn from_opt_responder(responder: Responder<Option<T>>) -> Self {
236        AutoClosingResponder(responder)
237    }
238
239    /// Extracts the inner responder.
240    fn into_inner(mut self) -> Responder<Option<T>> {
241        let is_shutting_down = self.0.is_shutting_down;
242        mem::replace(
243            &mut self.0,
244            Responder {
245                sender: None,
246                is_shutting_down,
247            },
248        )
249    }
250}
251
252impl<T: Debug> AutoClosingResponder<T> {
253    /// Send `Some(data)` to the origin of the request.
254    pub(crate) async fn respond(self, data: T) {
255        self.into_inner().respond(Some(data)).await;
256    }
257
258    /// Send `None` to the origin of the request.
259    pub(crate) async fn respond_none(self) {
260        self.into_inner().respond(None).await;
261    }
262}
263
264impl<T> Drop for AutoClosingResponder<T> {
265    fn drop(&mut self) {
266        if let Some(sender) = self.0.sender.take() {
267            debug!(
268                sending_value = %self.0,
269                "responding None by dropping auto-close responder"
270            );
271            // We still haven't answered, send an answer.
272            if let Err(_unsent_value) = sender.send(None) {
273                debug!(
274                    unsent_value = %self.0,
275                    "failed to auto-close responder, ignoring"
276                );
277            }
278        }
279    }
280}
281
282impl<T: 'static + Send> Responder<T> {
283    /// Creates a new `Responder`.
284    #[inline]
285    fn new(sender: oneshot::Sender<T>, is_shutting_down: SharedFlag) -> Self {
286        Responder {
287            sender: Some(sender),
288            is_shutting_down,
289        }
290    }
291
292    /// Helper method for tests.
293    ///
294    /// Allows creating a responder manually, without observing the shutdown flag. This function
295    /// should not be used, unless you are writing alternative infrastructure, e.g. for tests.
296    #[cfg(test)]
297    #[inline]
298    pub(crate) fn without_shutdown(sender: oneshot::Sender<T>) -> Self {
299        Responder::new(sender, SharedFlag::global_shared())
300    }
301}
302
303impl<T: Debug> Responder<T> {
304    /// Send `data` to the origin of the request.
305    pub(crate) async fn respond(mut self, data: T) {
306        if let Some(sender) = self.sender.take() {
307            if let Err(data) = sender.send(data) {
308                // If we cannot send a response down the channel, it means the original requester is
309                // no longer interested in our response. This typically happens during shutdowns, or
310                // in cases where an originating external request has been cancelled.
311
312                debug!(
313                    data=?FmtLimit::new(1000, &data),
314                    "ignored failure to send response to request down oneshot channel"
315                );
316            }
317        } else {
318            error!(
319                data=?FmtLimit::new(1000, &data),
320                "tried to send a value down a responder channel, but it was already used"
321            );
322        }
323    }
324}
325
326impl<T> Debug for Responder<T> {
327    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
328        write!(formatter, "Responder<{}>", type_name::<T>(),)
329    }
330}
331
332impl<T> Display for Responder<T> {
333    fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
334        write!(formatter, "responder({})", type_name::<T>(),)
335    }
336}
337
338impl<T> Drop for Responder<T> {
339    fn drop(&mut self) {
340        if self.sender.is_some() {
341            if self.is_shutting_down.is_set() {
342                debug!(
343                    responder=?self,
344                    "ignored dropping of responder during shutdown"
345                );
346            } else {
347                // This is usually a very serious error, as another component will now be stuck.
348                //
349                // See the code `make_request` for more details.
350                error!(
351                    responder=?self,
352                    "dropped without being responded to outside of shutdown"
353                );
354            }
355        }
356    }
357}
358
359impl<T> Serialize for Responder<T> {
360    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
361        serializer.serialize_str(&format!("{:?}", self))
362    }
363}
364
365impl<T> Serialize for AutoClosingResponder<T> {
366    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
367        self.0.serialize(serializer)
368    }
369}
370
371/// Effect extension for futures, used to convert futures into actual effects.
372pub(crate) trait EffectExt: Future + Send {
373    /// Finalizes a future into an effect that returns a single event.
374    ///
375    /// The function `f` is used to translate the returned value from an effect into an event.
376    fn event<U, F>(self, f: F) -> Effects<U>
377    where
378        F: FnOnce(Self::Output) -> U + 'static + Send,
379        U: 'static,
380        Self: Sized;
381
382    /// Finalizes a future into an effect that runs but drops the result.
383    fn ignore<Ev>(self) -> Effects<Ev>;
384}
385
386/// Effect extension for futures, used to convert futures returning a `Result` into two different
387/// effects.
388pub(crate) trait EffectResultExt {
389    /// The type the future will return if `Ok`.
390    type Value;
391    /// The type the future will return if `Err`.
392    type Error;
393
394    /// Finalizes a future returning a `Result` into two different effects.
395    ///
396    /// The function `f_ok` is used to translate the returned value from an effect into an event,
397    /// while the function `f_err` does the same for a potential error.
398    fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
399    where
400        F: FnOnce(Self::Value) -> U + 'static + Send,
401        G: FnOnce(Self::Error) -> U + 'static + Send,
402        U: 'static;
403}
404
405impl<T> EffectExt for T
406where
407    T: Future + Send + 'static + Sized,
408{
409    fn event<U, F>(self, f: F) -> Effects<U>
410    where
411        F: FnOnce(Self::Output) -> U + 'static + Send,
412        U: 'static,
413    {
414        smallvec![self.map(f).map(|item| smallvec![item]).boxed()]
415    }
416
417    fn ignore<Ev>(self) -> Effects<Ev> {
418        smallvec![self.map(|_| Multiple::new()).boxed()]
419    }
420}
421
422impl<T, V, E> EffectResultExt for T
423where
424    T: Future<Output = Result<V, E>> + Send + 'static + Sized,
425{
426    type Value = V;
427    type Error = E;
428
429    fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
430    where
431        F: FnOnce(V) -> U + 'static + Send,
432        G: FnOnce(E) -> U + 'static + Send,
433        U: 'static,
434    {
435        smallvec![self
436            .map(|result| result.map_or_else(f_err, f_ok))
437            .map(|item| smallvec![item])
438            .boxed()]
439    }
440}
441
442/// A builder for [`Effect`](type.Effect.html)s.
443///
444/// Provides methods allowing the creation of effects which need to be scheduled on the reactor's
445/// event queue, without giving direct access to this queue.
446///
447/// The `REv` type parameter indicates which reactor event effects created by this builder will
448/// produce as side effects.
449#[derive(Debug)]
450pub(crate) struct EffectBuilder<REv: 'static> {
451    /// A handle to the referenced event queue.
452    event_queue: EventQueueHandle<REv>,
453}
454
455// Implement `Clone` and `Copy` manually, as `derive` will make it depend on `REv` otherwise.
456impl<REv> Clone for EffectBuilder<REv> {
457    fn clone(&self) -> Self {
458        *self
459    }
460}
461
462impl<REv> Copy for EffectBuilder<REv> {}
463
464impl<REv> EffectBuilder<REv> {
465    /// Creates a new effect builder.
466    pub(crate) fn new(event_queue: EventQueueHandle<REv>) -> Self {
467        EffectBuilder { event_queue }
468    }
469
470    /// Extract the event queue handle out of the effect builder.
471    pub(crate) fn into_inner(self) -> EventQueueHandle<REv> {
472        self.event_queue
473    }
474
475    /// Performs a request.
476    ///
477    /// Given a request `Q`, that when completed will yield a result of `T`, produces a future that
478    /// will
479    ///
480    /// 1. create an event to send the request to the respective component (thus `Q: Into<REv>`),
481    /// 2. wait for a response and return it.
482    ///
483    /// This function is usually only used internally by effects implemented on the effects builder,
484    /// but IO components may also make use of it.
485    ///
486    /// # Cancellation safety
487    ///
488    /// This future is cancellation safe: If it is dropped without being polled, it indicates
489    /// that the original requester is no longer interested in the result, which will be discarded.
490    pub(crate) async fn make_request<T, Q, F>(self, f: F, queue_kind: QueueKind) -> T
491    where
492        T: Send + 'static,
493        Q: Into<REv>,
494        F: FnOnce(Responder<T>) -> Q,
495    {
496        let (event, wait_future) = self.create_request_parts(f);
497
498        // Schedule the request before awaiting the response.
499        self.event_queue.schedule(event, queue_kind).await;
500        wait_future.await
501    }
502
503    /// Creates the part necessary to make a request.
504    ///
505    /// A request usually consists of two parts: The request event that needs to be scheduled on the
506    /// reactor queue and associated future that allows waiting for the response. This function
507    /// creates both of them without processing or spawning either.
508    ///
509    /// Usually you will want to call the higher level `make_request` function.
510    pub(crate) fn create_request_parts<T, Q, F>(self, f: F) -> (REv, impl Future<Output = T>)
511    where
512        T: Send + 'static,
513        Q: Into<REv>,
514        F: FnOnce(Responder<T>) -> Q,
515    {
516        // Prepare a channel.
517        let (sender, receiver) = oneshot::channel();
518
519        // Create response function.
520        let responder = Responder::new(sender, self.event_queue.shutdown_flag());
521
522        // Now inject the request event into the event loop.
523        let request_event = f(responder).into();
524
525        let fut = async move {
526            match receiver.await {
527                Ok(value) => value,
528                Err(err) => {
529                    // The channel should usually not be closed except during shutdowns, as it
530                    // indicates a panic or disappearance of the remote that is
531                    // supposed to process the request.
532                    //
533                    // If it does happen, we pretend nothing happened instead of crashing.
534                    if self.event_queue.shutdown_flag().is_set() {
535                        debug!(%err, channel=?type_name::<T>(), "ignoring closed channel due to shutdown");
536                    } else {
537                        error!(%err, channel=?type_name::<T>(), "request for channel closed, this may be a bug? \
538                            check if a component is stuck from now on");
539                    }
540
541                    // We cannot produce any value to satisfy the request, so we just abandon this
542                    // task by waiting on a resource we can never acquire.
543                    let _ = UNOBTAINABLE.acquire().await;
544                    panic!("should never obtain unobtainable semaphore");
545                }
546            }
547        };
548
549        (request_event, fut)
550    }
551
552    /// Run and end effect immediately.
553    ///
554    /// Can be used to trigger events from effects when combined with `.event`. Do not use this to
555    /// "do nothing", as it will still cause a task to be spawned.
556    #[inline(always)]
557    #[allow(clippy::manual_async_fn)]
558    pub(crate) fn immediately(self) -> impl Future<Output = ()> + Send {
559        // Note: This function is implemented manually without `async` sugar because the `Send`
560        // inference seems to not work in all cases otherwise.
561        async {}
562    }
563
564    /// Reports a fatal error.  Normally called via the `crate::fatal!()` macro.
565    ///
566    /// Usually causes the node to cease operations quickly and exit/crash.
567    pub(crate) async fn fatal(self, file: &'static str, line: u32, msg: String)
568    where
569        REv: From<FatalAnnouncement>,
570    {
571        self.event_queue
572            .schedule(FatalAnnouncement { file, line, msg }, QueueKind::Control)
573            .await;
574    }
575
576    /// Sets a timeout.
577    pub(crate) async fn set_timeout(self, timeout: Duration) -> Duration {
578        let then = Instant::now();
579        time::sleep(timeout).await;
580        then.elapsed()
581    }
582
583    /// Retrieve a snapshot of the nodes current metrics formatted as string.
584    ///
585    /// If an error occurred producing the metrics, `None` is returned.
586    pub(crate) async fn get_metrics(self) -> Option<String>
587    where
588        REv: From<MetricsRequest>,
589    {
590        self.make_request(
591            |responder| MetricsRequest::RenderNodeMetricsText { responder },
592            QueueKind::Api,
593        )
594        .await
595    }
596
597    /// Sends a network message.
598    ///
599    /// The message is queued and sent, but no delivery guaranteed. Will return after the message
600    /// has been buffered in the outgoing kernel buffer and thus is subject to backpressure.
601    pub(crate) async fn send_message<P>(self, dest: NodeId, payload: P)
602    where
603        REv: From<NetworkRequest<P>>,
604    {
605        self.make_request(
606            |responder| NetworkRequest::SendMessage {
607                dest: Box::new(dest),
608                payload: Box::new(payload),
609                respond_after_queueing: false,
610                auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
611            },
612            QueueKind::Network,
613        )
614        .await;
615    }
616
617    /// Enqueues a network message.
618    ///
619    /// The message is queued in "fire-and-forget" fashion, there is no guarantee that the peer
620    /// will receive it. Returns as soon as the message is queued inside the networking component.
621    pub(crate) async fn enqueue_message<P>(self, dest: NodeId, payload: P)
622    where
623        REv: From<NetworkRequest<P>>,
624    {
625        self.make_request(
626            |responder| NetworkRequest::SendMessage {
627                dest: Box::new(dest),
628                payload: Box::new(payload),
629                respond_after_queueing: true,
630                auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
631            },
632            QueueKind::Network,
633        )
634        .await;
635    }
636
637    /// Broadcasts a network message to validator peers in the given era.
638    pub(crate) async fn broadcast_message_to_validators<P>(self, payload: P, era_id: EraId)
639    where
640        REv: From<NetworkRequest<P>>,
641    {
642        self.make_request(
643            |responder| {
644                debug!("validator broadcast for {}", era_id);
645                NetworkRequest::ValidatorBroadcast {
646                    payload: Box::new(payload),
647                    era_id,
648                    auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
649                }
650            },
651            QueueKind::Network,
652        )
653        .await;
654    }
655
656    /// Gossips a network message.
657    ///
658    /// A low-level "gossip" function, selects `count` randomly chosen nodes on the network,
659    /// excluding the indicated ones, and sends each a copy of the message.
660    ///
661    /// Returns the IDs of the chosen nodes.
662    pub(crate) async fn gossip_message<P>(
663        self,
664        payload: P,
665        gossip_target: GossipTarget,
666        count: usize,
667        exclude: HashSet<NodeId>,
668    ) -> HashSet<NodeId>
669    where
670        REv: From<NetworkRequest<P>>,
671        P: Send,
672    {
673        self.make_request(
674            |responder| NetworkRequest::Gossip {
675                payload: Box::new(payload),
676                gossip_target,
677                count,
678                exclude,
679                auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
680            },
681            QueueKind::Network,
682        )
683        .await
684        .unwrap_or_default()
685    }
686
687    /// Gets a structure describing the current network status.
688    pub(crate) async fn get_network_insights(self) -> NetworkInsights
689    where
690        REv: From<NetworkInfoRequest>,
691    {
692        self.make_request(
693            |responder| NetworkInfoRequest::Insight { responder },
694            QueueKind::Regular,
695        )
696        .await
697    }
698
699    /// Gets a map of the current network peers to their socket addresses.
700    pub(crate) async fn network_peers(self) -> BTreeMap<NodeId, String>
701    where
702        REv: From<NetworkInfoRequest>,
703    {
704        self.make_request(
705            |responder| NetworkInfoRequest::Peers { responder },
706            QueueKind::Api,
707        )
708        .await
709    }
710
711    /// Gets up to `count` fully-connected network peers in random order.
712    pub async fn get_fully_connected_peers(self, count: usize) -> Vec<NodeId>
713    where
714        REv: From<NetworkInfoRequest>,
715    {
716        self.make_request(
717            |responder| NetworkInfoRequest::FullyConnectedPeers { count, responder },
718            QueueKind::NetworkInfo,
719        )
720        .await
721    }
722
723    /// Announces which transactions have expired.
724    pub(crate) async fn announce_expired_transactions(self, hashes: Vec<TransactionHash>)
725    where
726        REv: From<TransactionBufferAnnouncement>,
727    {
728        self.event_queue
729            .schedule(
730                TransactionBufferAnnouncement::TransactionsExpired(hashes),
731                QueueKind::Validation,
732            )
733            .await;
734    }
735
736    /// Announces an incoming network message.
737    pub(crate) async fn announce_incoming<P>(self, sender: NodeId, payload: P)
738    where
739        REv: FromIncoming<P>,
740    {
741        self.event_queue
742            .schedule(
743                <REv as FromIncoming<P>>::from_incoming(sender, payload),
744                QueueKind::NetworkIncoming,
745            )
746            .await;
747    }
748
749    /// Announces that a gossiper has received a new item, where the item's ID is the complete item.
750    pub(crate) async fn announce_complete_item_received_via_gossip<T: GossipItem>(self, item: T::Id)
751    where
752        REv: From<GossiperAnnouncement<T>>,
753    {
754        assert!(
755            T::ID_IS_COMPLETE_ITEM,
756            "{} must be an item where the ID _is_ the complete item",
757            item
758        );
759        self.event_queue
760            .schedule(
761                GossiperAnnouncement::NewCompleteItem(item),
762                QueueKind::Gossip,
763            )
764            .await;
765    }
766
767    /// Announces that a gossiper has received a full item, where the item's ID is NOT the complete
768    /// item.
769    pub(crate) async fn announce_item_body_received_via_gossip<T: GossipItem>(
770        self,
771        item: Box<T>,
772        sender: NodeId,
773    ) where
774        REv: From<GossiperAnnouncement<T>>,
775    {
776        self.event_queue
777            .schedule(
778                GossiperAnnouncement::NewItemBody { item, sender },
779                QueueKind::Gossip,
780            )
781            .await;
782    }
783
784    /// Announces that the block accumulator has received and stored a new finality signature.
785    pub(crate) async fn announce_finality_signature_accepted(
786        self,
787        finality_signature: Box<FinalitySignatureV2>,
788    ) where
789        REv: From<BlockAccumulatorAnnouncement>,
790    {
791        self.event_queue
792            .schedule(
793                BlockAccumulatorAnnouncement::AcceptedNewFinalitySignature { finality_signature },
794                QueueKind::FinalitySignature,
795            )
796            .await;
797    }
798
799    /// Request that a block be made executable, if able to: `ExecutableBlock`.
800    ///
801    /// Completion means that the block can be enqueued for processing by the execution engine via
802    /// the contract_runtime component.
803    pub(crate) async fn make_block_executable(
804        self,
805        block_hash: BlockHash,
806    ) -> Option<ExecutableBlock>
807    where
808        REv: From<MakeBlockExecutableRequest>,
809    {
810        self.make_request(
811            |responder| MakeBlockExecutableRequest {
812                block_hash,
813                responder,
814            },
815            QueueKind::FromStorage,
816        )
817        .await
818    }
819
820    /// Request that a block with a specific height be marked completed.
821    ///
822    /// Completion means that the block itself (along with its header) and all of its transactions
823    /// have been persisted to storage and its global state root hash is missing no dependencies
824    /// in the global state.
825    pub(crate) async fn mark_block_completed(self, block_height: u64) -> bool
826    where
827        REv: From<MarkBlockCompletedRequest>,
828    {
829        self.make_request(
830            |responder| MarkBlockCompletedRequest {
831                block_height,
832                responder,
833            },
834            QueueKind::FromStorage,
835        )
836        .await
837    }
838
839    /// Try to accept a transaction received from the JSON-RPC server.
840    pub(crate) async fn try_accept_transaction(
841        self,
842        transaction: Transaction,
843        is_speculative: bool,
844    ) -> Result<(), transaction_acceptor::Error>
845    where
846        REv: From<AcceptTransactionRequest>,
847    {
848        self.make_request(
849            |responder| AcceptTransactionRequest {
850                transaction,
851                is_speculative,
852                responder,
853            },
854            QueueKind::Api,
855        )
856        .await
857    }
858
859    /// Announces that a transaction not previously stored has now been accepted and stored.
860    pub(crate) fn announce_new_transaction_accepted(
861        self,
862        transaction: Arc<Transaction>,
863        source: Source,
864    ) -> impl Future<Output = ()>
865    where
866        REv: From<TransactionAcceptorAnnouncement>,
867    {
868        self.event_queue.schedule(
869            TransactionAcceptorAnnouncement::AcceptedNewTransaction {
870                transaction,
871                source,
872            },
873            QueueKind::Validation,
874        )
875    }
876
877    /// Announces that we have received a gossip message from this peer,
878    /// implying the peer holds the indicated item.
879    pub(crate) async fn announce_gossip_received<T>(self, item_id: T::Id, sender: NodeId)
880    where
881        REv: From<GossiperAnnouncement<T>>,
882        T: GossipItem,
883    {
884        self.event_queue
885            .schedule(
886                GossiperAnnouncement::GossipReceived { item_id, sender },
887                QueueKind::Gossip,
888            )
889            .await;
890    }
891
892    /// Announces that we have finished gossiping the indicated item.
893    pub(crate) async fn announce_finished_gossiping<T>(self, item_id: T::Id)
894    where
895        REv: From<GossiperAnnouncement<T>>,
896        T: GossipItem,
897    {
898        self.event_queue
899            .schedule(
900                GossiperAnnouncement::FinishedGossiping(item_id),
901                QueueKind::Gossip,
902            )
903            .await;
904    }
905
906    pub(crate) fn announce_invalid_transaction(
907        self,
908        transaction: Transaction,
909        source: Source,
910    ) -> impl Future<Output = ()>
911    where
912        REv: From<TransactionAcceptorAnnouncement>,
913    {
914        self.event_queue.schedule(
915            TransactionAcceptorAnnouncement::InvalidTransaction {
916                transaction,
917                source,
918            },
919            QueueKind::Validation,
920        )
921    }
922
923    /// Announces upgrade activation point read.
924    pub(crate) async fn upgrade_watcher_announcement(self, maybe_next_upgrade: Option<NextUpgrade>)
925    where
926        REv: From<UpgradeWatcherAnnouncement>,
927    {
928        self.event_queue
929            .schedule(
930                UpgradeWatcherAnnouncement(maybe_next_upgrade),
931                QueueKind::Control,
932            )
933            .await;
934    }
935
936    /// Announces a committed Step success.
937    pub(crate) async fn announce_commit_step_success(self, era_id: EraId, effects: ExecutionEffects)
938    where
939        REv: From<ContractRuntimeAnnouncement>,
940    {
941        self.event_queue
942            .schedule(
943                ContractRuntimeAnnouncement::CommitStepSuccess { era_id, effects },
944                QueueKind::ContractRuntime,
945            )
946            .await;
947    }
948
949    pub(crate) async fn update_contract_runtime_state(self, new_pre_state: ExecutionPreState)
950    where
951        REv: From<ContractRuntimeRequest>,
952    {
953        self.event_queue
954            .schedule(
955                ContractRuntimeRequest::UpdatePreState { new_pre_state },
956                QueueKind::ContractRuntime,
957            )
958            .await;
959    }
960
961    /// Announces validators for upcoming era.
962    pub(crate) async fn announce_upcoming_era_validators(
963        self,
964        era_that_is_ending: EraId,
965        upcoming_era_validators: BTreeMap<EraId, BTreeMap<PublicKey, U512>>,
966    ) where
967        REv: From<ContractRuntimeAnnouncement>,
968    {
969        self.event_queue
970            .schedule(
971                ContractRuntimeAnnouncement::UpcomingEraValidators {
972                    era_that_is_ending,
973                    upcoming_era_validators,
974                },
975                QueueKind::ContractRuntime,
976            )
977            .await;
978    }
979
980    pub(crate) async fn announce_new_era_gas_price(self, era_id: EraId, next_era_gas_price: u8)
981    where
982        REv: From<ContractRuntimeAnnouncement>,
983    {
984        self.event_queue
985            .schedule(
986                ContractRuntimeAnnouncement::NextEraGasPrice {
987                    era_id,
988                    next_era_gas_price,
989                },
990                QueueKind::ContractRuntime,
991            )
992            .await;
993    }
994
995    /// Begins gossiping an item.
996    pub(crate) async fn begin_gossip<T>(self, item_id: T::Id, source: Source, target: GossipTarget)
997    where
998        T: GossipItem,
999        REv: From<BeginGossipRequest<T>>,
1000    {
1001        self.make_request(
1002            |responder| BeginGossipRequest {
1003                item_id,
1004                source,
1005                target,
1006                responder,
1007            },
1008            QueueKind::Gossip,
1009        )
1010        .await;
1011    }
1012
1013    /// Puts the given block into the linear block store.
1014    pub(crate) async fn put_block_to_storage(self, block: Arc<Block>) -> bool
1015    where
1016        REv: From<StorageRequest>,
1017    {
1018        self.make_request(
1019            |responder| StorageRequest::PutBlock { block, responder },
1020            QueueKind::ToStorage,
1021        )
1022        .await
1023    }
1024
1025    /// Puts the given approvals hashes into the linear block store.
1026    pub(crate) async fn put_approvals_hashes_to_storage(
1027        self,
1028        approvals_hashes: Box<ApprovalsHashes>,
1029    ) -> bool
1030    where
1031        REv: From<StorageRequest>,
1032    {
1033        self.make_request(
1034            |responder| StorageRequest::PutApprovalsHashes {
1035                approvals_hashes,
1036                responder,
1037            },
1038            QueueKind::ToStorage,
1039        )
1040        .await
1041    }
1042
1043    /// Puts the given block and approvals hashes into the linear block store.
1044    pub(crate) async fn put_executed_block_to_storage(
1045        self,
1046        block: Arc<BlockV2>,
1047        approvals_hashes: Box<ApprovalsHashes>,
1048        execution_results: HashMap<TransactionHash, ExecutionResult>,
1049    ) -> bool
1050    where
1051        REv: From<StorageRequest>,
1052    {
1053        self.make_request(
1054            |responder| StorageRequest::PutExecutedBlock {
1055                block,
1056                approvals_hashes,
1057                execution_results,
1058                responder,
1059            },
1060            QueueKind::ToStorage,
1061        )
1062        .await
1063    }
1064
1065    /// Gets the requested block from the linear block store.
1066    pub(crate) async fn get_block_from_storage(self, block_hash: BlockHash) -> Option<Block>
1067    where
1068        REv: From<StorageRequest>,
1069    {
1070        self.make_request(
1071            |responder| StorageRequest::GetBlock {
1072                block_hash,
1073                responder,
1074            },
1075            QueueKind::FromStorage,
1076        )
1077        .await
1078    }
1079
1080    pub(crate) async fn get_block_utilization(
1081        self,
1082        era_id: EraId,
1083        block_height: u64,
1084        transaction_count: u64,
1085    ) -> Option<(u64, u64)>
1086    where
1087        REv: From<StorageRequest>,
1088    {
1089        self.make_request(
1090            |responder| StorageRequest::GetBlockUtilizationScore {
1091                era_id,
1092                block_height,
1093                switch_block_utilization: transaction_count,
1094                responder,
1095            },
1096            QueueKind::FromStorage,
1097        )
1098        .await
1099    }
1100
1101    pub(crate) async fn is_block_stored(self, block_hash: BlockHash) -> bool
1102    where
1103        REv: From<StorageRequest>,
1104    {
1105        self.make_request(
1106            |responder| StorageRequest::IsBlockStored {
1107                block_hash,
1108                responder,
1109            },
1110            QueueKind::FromStorage,
1111        )
1112        .await
1113    }
1114
1115    /// Gets the requested `ApprovalsHashes` from storage.
1116    pub(crate) async fn get_approvals_hashes_from_storage(
1117        self,
1118        block_hash: BlockHash,
1119    ) -> Option<ApprovalsHashes>
1120    where
1121        REv: From<StorageRequest>,
1122    {
1123        self.make_request(
1124            |responder| StorageRequest::GetApprovalsHashes {
1125                block_hash,
1126                responder,
1127            },
1128            QueueKind::FromStorage,
1129        )
1130        .await
1131    }
1132
1133    pub(crate) async fn get_raw_data(
1134        self,
1135        record_id: RecordId,
1136        key: Vec<u8>,
1137    ) -> Option<DbRawBytesSpec>
1138    where
1139        REv: From<StorageRequest>,
1140    {
1141        self.make_request(
1142            |responder| StorageRequest::GetRawData {
1143                record_id,
1144                key,
1145                responder,
1146            },
1147            QueueKind::FromStorage,
1148        )
1149        .await
1150    }
1151
1152    /// Gets the requested block header from the linear block store.
1153    pub(crate) async fn get_block_header_from_storage(
1154        self,
1155        block_hash: BlockHash,
1156        only_from_available_block_range: bool,
1157    ) -> Option<BlockHeader>
1158    where
1159        REv: From<StorageRequest>,
1160    {
1161        self.make_request(
1162            |responder| StorageRequest::GetBlockHeader {
1163                block_hash,
1164                only_from_available_block_range,
1165                responder,
1166            },
1167            QueueKind::FromStorage,
1168        )
1169        .await
1170    }
1171
1172    pub(crate) async fn get_block_header_at_height_from_storage(
1173        self,
1174        block_height: u64,
1175        only_from_available_block_range: bool,
1176    ) -> Option<BlockHeader>
1177    where
1178        REv: From<StorageRequest>,
1179    {
1180        self.make_request(
1181            |responder| StorageRequest::GetBlockHeaderByHeight {
1182                block_height,
1183                only_from_available_block_range,
1184                responder,
1185            },
1186            QueueKind::FromStorage,
1187        )
1188        .await
1189    }
1190
1191    pub(crate) async fn get_latest_switch_block_header_from_storage(self) -> Option<BlockHeader>
1192    where
1193        REv: From<StorageRequest>,
1194    {
1195        self.make_request(
1196            |responder| StorageRequest::GetLatestSwitchBlockHeader { responder },
1197            QueueKind::FromStorage,
1198        )
1199        .await
1200    }
1201
1202    pub(crate) async fn get_switch_block_header_by_era_id_from_storage(
1203        self,
1204        era_id: EraId,
1205    ) -> Option<BlockHeader>
1206    where
1207        REv: From<StorageRequest>,
1208    {
1209        self.make_request(
1210            |responder| StorageRequest::GetSwitchBlockHeaderByEra { era_id, responder },
1211            QueueKind::FromStorage,
1212        )
1213        .await
1214    }
1215
1216    /// Gets the requested signature for a given block hash.
1217    pub(crate) async fn get_signature_from_storage(
1218        self,
1219        block_hash: BlockHash,
1220        public_key: PublicKey,
1221    ) -> Option<FinalitySignature>
1222    where
1223        REv: From<StorageRequest>,
1224    {
1225        self.make_request(
1226            |responder| StorageRequest::GetBlockSignature {
1227                block_hash,
1228                public_key: Box::new(public_key),
1229                responder,
1230            },
1231            QueueKind::FromStorage,
1232        )
1233        .await
1234    }
1235
1236    pub(crate) async fn get_execution_results_from_storage(
1237        self,
1238        block_hash: BlockHash,
1239    ) -> Option<Vec<(TransactionHash, TransactionHeader, ExecutionResult)>>
1240    where
1241        REv: From<StorageRequest>,
1242    {
1243        self.make_request(
1244            |responder| StorageRequest::GetExecutionResults {
1245                block_hash,
1246                responder,
1247            },
1248            QueueKind::FromStorage,
1249        )
1250        .await
1251    }
1252
1253    /// Puts a block header to storage.
1254    pub(crate) async fn put_block_header_to_storage(self, block_header: Box<BlockHeader>) -> bool
1255    where
1256        REv: From<StorageRequest>,
1257    {
1258        self.make_request(
1259            |responder| StorageRequest::PutBlockHeader {
1260                block_header,
1261                responder,
1262            },
1263            QueueKind::ToStorage,
1264        )
1265        .await
1266    }
1267
1268    /// Puts the requested block signatures into storage.
1269    ///
1270    /// If `signatures.proofs` is empty, no attempt to store will be made, an error will be logged,
1271    /// and this function will return `false`.
1272    pub(crate) async fn put_signatures_to_storage(self, signatures: BlockSignatures) -> bool
1273    where
1274        REv: From<StorageRequest>,
1275    {
1276        self.make_request(
1277            |responder| StorageRequest::PutBlockSignatures {
1278                signatures,
1279                responder,
1280            },
1281            QueueKind::ToStorage,
1282        )
1283        .await
1284    }
1285
1286    pub(crate) async fn put_finality_signature_to_storage(
1287        self,
1288        signature: FinalitySignature,
1289    ) -> bool
1290    where
1291        REv: From<StorageRequest>,
1292    {
1293        self.make_request(
1294            |responder| StorageRequest::PutFinalitySignature {
1295                signature: Box::new(signature),
1296                responder,
1297            },
1298            QueueKind::ToStorage,
1299        )
1300        .await
1301    }
1302
1303    /// Gets the requested block's transfers from storage.
1304    pub(crate) async fn get_block_transfers_from_storage(
1305        self,
1306        block_hash: BlockHash,
1307    ) -> Option<Vec<Transfer>>
1308    where
1309        REv: From<StorageRequest>,
1310    {
1311        self.make_request(
1312            |responder| StorageRequest::GetBlockTransfers {
1313                block_hash,
1314                responder,
1315            },
1316            QueueKind::FromStorage,
1317        )
1318        .await
1319    }
1320
1321    /// Returns the era IDs of the blocks in which the given transactions were executed.  If none
1322    /// of the transactions have been executed yet, an empty set will be returned.
1323    pub(crate) async fn get_transactions_era_ids(
1324        self,
1325        transaction_hashes: HashSet<TransactionHash>,
1326    ) -> HashSet<EraId>
1327    where
1328        REv: From<StorageRequest>,
1329    {
1330        self.make_request(
1331            |responder| StorageRequest::GetTransactionsEraIds {
1332                transaction_hashes,
1333                responder,
1334            },
1335            QueueKind::FromStorage,
1336        )
1337        .await
1338    }
1339
1340    /// Requests the highest complete block.
1341    pub(crate) async fn get_highest_complete_block_from_storage(self) -> Option<Block>
1342    where
1343        REv: From<StorageRequest>,
1344    {
1345        self.make_request(
1346            |responder| StorageRequest::GetHighestCompleteBlock { responder },
1347            QueueKind::FromStorage,
1348        )
1349        .await
1350    }
1351
1352    /// Requests the highest complete block header.
1353    pub(crate) async fn get_highest_complete_block_header_from_storage(self) -> Option<BlockHeader>
1354    where
1355        REv: From<StorageRequest>,
1356    {
1357        self.make_request(
1358            |responder| StorageRequest::GetHighestCompleteBlockHeader { responder },
1359            QueueKind::FromStorage,
1360        )
1361        .await
1362    }
1363
1364    /// Requests the height range of fully available blocks (not just block headers).
1365    pub(crate) async fn get_available_block_range_from_storage(self) -> AvailableBlockRange
1366    where
1367        REv: From<StorageRequest>,
1368    {
1369        self.make_request(
1370            |responder| StorageRequest::GetAvailableBlockRange { responder },
1371            QueueKind::FromStorage,
1372        )
1373        .await
1374    }
1375
1376    /// Synchronize global state under the given root hash.
1377    pub(crate) async fn sync_global_state(
1378        self,
1379        block_hash: BlockHash,
1380        state_root_hash: Digest,
1381    ) -> Result<GlobalStateSynchronizerResponse, GlobalStateSynchronizerError>
1382    where
1383        REv: From<SyncGlobalStateRequest>,
1384    {
1385        self.make_request(
1386            |responder| SyncGlobalStateRequest {
1387                block_hash,
1388                state_root_hash,
1389                responder,
1390            },
1391            QueueKind::SyncGlobalState,
1392        )
1393        .await
1394    }
1395
1396    /// Get a trie or chunk by its ID.
1397    pub(crate) async fn get_trie(self, request: TrieRequest) -> TrieResult
1398    where
1399        REv: From<ContractRuntimeRequest>,
1400    {
1401        self.make_request(
1402            |responder| ContractRuntimeRequest::GetTrie { request, responder },
1403            QueueKind::ContractRuntime,
1404        )
1405        .await
1406    }
1407
1408    pub(crate) async fn get_reactor_state(self) -> ReactorState
1409    where
1410        REv: From<ReactorInfoRequest>,
1411    {
1412        self.make_request(
1413            |responder| ReactorInfoRequest::ReactorState { responder },
1414            QueueKind::Regular,
1415        )
1416        .await
1417    }
1418
1419    pub(crate) async fn get_last_progress(self) -> LastProgress
1420    where
1421        REv: From<ReactorInfoRequest>,
1422    {
1423        self.make_request(
1424            |responder| ReactorInfoRequest::LastProgress { responder },
1425            QueueKind::Regular,
1426        )
1427        .await
1428    }
1429
1430    pub(crate) async fn get_uptime(self) -> Uptime
1431    where
1432        REv: From<ReactorInfoRequest>,
1433    {
1434        self.make_request(
1435            |responder| ReactorInfoRequest::Uptime { responder },
1436            QueueKind::Regular,
1437        )
1438        .await
1439    }
1440
1441    pub(crate) async fn get_network_name(self) -> NetworkName
1442    where
1443        REv: From<ReactorInfoRequest>,
1444    {
1445        self.make_request(
1446            |responder| ReactorInfoRequest::NetworkName { responder },
1447            QueueKind::Regular,
1448        )
1449        .await
1450    }
1451
1452    #[allow(unused)]
1453    pub(crate) async fn get_balance_holds_interval(self) -> TimeDiff
1454    where
1455        REv: From<ReactorInfoRequest>,
1456    {
1457        self.make_request(
1458            |responder| ReactorInfoRequest::BalanceHoldsInterval { responder },
1459            QueueKind::Regular,
1460        )
1461        .await
1462    }
1463
1464    pub(crate) async fn get_block_synchronizer_status(self) -> BlockSynchronizerStatus
1465    where
1466        REv: From<BlockSynchronizerRequest>,
1467    {
1468        self.make_request(
1469            |responder| BlockSynchronizerRequest::Status { responder },
1470            QueueKind::Regular,
1471        )
1472        .await
1473    }
1474
1475    /// Puts a trie into the trie store; succeeds only if all the children of the trie are already
1476    /// present in the store.
1477    /// Returns the digest under which the trie was stored if successful.
1478    pub(crate) async fn put_trie_if_all_children_present(
1479        self,
1480        request: PutTrieRequest,
1481    ) -> PutTrieResult
1482    where
1483        REv: From<ContractRuntimeRequest>,
1484    {
1485        self.make_request(
1486            |responder| ContractRuntimeRequest::PutTrie { request, responder },
1487            QueueKind::ContractRuntime,
1488        )
1489        .await
1490    }
1491
1492    pub(crate) async fn get_current_gas_price(self, era_id: EraId) -> Option<u8>
1493    where
1494        REv: From<ContractRuntimeRequest>,
1495    {
1496        self.make_request(
1497            |responder| ContractRuntimeRequest::GetEraGasPrice { era_id, responder },
1498            QueueKind::ContractRuntime,
1499        )
1500        .await
1501    }
1502
1503    pub(crate) async fn put_transaction_to_storage(self, transaction: Transaction) -> bool
1504    where
1505        REv: From<StorageRequest>,
1506    {
1507        self.make_request(
1508            |responder| StorageRequest::PutTransaction {
1509                transaction: Arc::new(transaction),
1510                responder,
1511            },
1512            QueueKind::ToStorage,
1513        )
1514        .await
1515    }
1516
1517    /// Gets the requested transactions from storage.
1518    ///
1519    /// Returns the "original" transactions, which are the first received by the node, along with a
1520    /// potentially different set of approvals used during execution of the recorded block.
1521    pub(crate) async fn get_transactions_from_storage(
1522        self,
1523        transaction_hashes: Vec<TransactionHash>,
1524    ) -> SmallVec<[Option<(Transaction, Option<BTreeSet<Approval>>)>; 1]>
1525    where
1526        REv: From<StorageRequest>,
1527    {
1528        self.make_request(
1529            |responder| StorageRequest::GetTransactions {
1530                transaction_hashes,
1531                responder,
1532            },
1533            QueueKind::FromStorage,
1534        )
1535        .await
1536    }
1537
1538    /// Gets the requested transaction and its execution info from storage by TransactionHash.
1539    pub(crate) async fn get_transaction_and_exec_info_from_storage(
1540        self,
1541        transaction_hash: TransactionHash,
1542        with_finalized_approvals: bool,
1543    ) -> Option<(Transaction, Option<ExecutionInfo>)>
1544    where
1545        REv: From<StorageRequest>,
1546    {
1547        self.make_request(
1548            |responder| StorageRequest::GetTransactionAndExecutionInfo {
1549                transaction_hash,
1550                with_finalized_approvals,
1551                responder,
1552            },
1553            QueueKind::FromStorage,
1554        )
1555        .await
1556    }
1557
1558    /// Gets the requested legacy deploy from the legacy deploy store by DeployHash only.
1559    ///
1560    /// Returns the legacy deploy containing the set of approvals used during execution of the
1561    /// recorded block, if known.
1562    pub(crate) async fn get_stored_legacy_deploy(
1563        self,
1564        deploy_hash: DeployHash,
1565    ) -> Option<LegacyDeploy>
1566    where
1567        REv: From<StorageRequest>,
1568    {
1569        self.make_request(
1570            |responder| StorageRequest::GetLegacyDeploy {
1571                deploy_hash,
1572                responder,
1573            },
1574            QueueKind::FromStorage,
1575        )
1576        .await
1577    }
1578
1579    /// Gets the requested transaction from storage by TransactionId.
1580    ///
1581    /// Returns the "original" transaction, which is the first received by the node, along with a
1582    /// potentially different set of approvals used during execution of the recorded block.
1583    pub(crate) async fn get_stored_transaction(
1584        self,
1585        transaction_id: TransactionId,
1586    ) -> Option<Transaction>
1587    where
1588        REv: From<StorageRequest>,
1589    {
1590        self.make_request(
1591            |responder| StorageRequest::GetTransaction {
1592                transaction_id,
1593                responder,
1594            },
1595            QueueKind::FromStorage,
1596        )
1597        .await
1598    }
1599
1600    pub(crate) async fn is_transaction_stored(self, transaction_id: TransactionId) -> bool
1601    where
1602        REv: From<StorageRequest>,
1603    {
1604        self.make_request(
1605            |responder| StorageRequest::IsTransactionStored {
1606                transaction_id,
1607                responder,
1608            },
1609            QueueKind::FromStorage,
1610        )
1611        .await
1612    }
1613
1614    /// Stores the given execution results for the transactions in the given block in the linear
1615    /// block store.
1616    pub(crate) async fn put_execution_artifacts_to_storage(
1617        self,
1618        block_hash: BlockHash,
1619        block_height: u64,
1620        era_id: EraId,
1621        execution_results: HashMap<TransactionHash, ExecutionResult>,
1622    ) where
1623        REv: From<StorageRequest>,
1624    {
1625        self.make_request(
1626            |responder| StorageRequest::PutExecutionResults {
1627                block_hash: Box::new(block_hash),
1628                block_height,
1629                era_id,
1630                execution_results,
1631                responder,
1632            },
1633            QueueKind::ToStorage,
1634        )
1635        .await;
1636    }
1637
1638    /// Gets the requested block and its finality signatures.
1639    pub(crate) async fn get_block_at_height_with_metadata_from_storage(
1640        self,
1641        block_height: u64,
1642        only_from_available_block_range: bool,
1643    ) -> Option<BlockWithMetadata>
1644    where
1645        REv: From<StorageRequest>,
1646    {
1647        self.make_request(
1648            |responder| StorageRequest::GetBlockAndMetadataByHeight {
1649                block_height,
1650                only_from_available_block_range,
1651                responder,
1652            },
1653            QueueKind::FromStorage,
1654        )
1655        .await
1656    }
1657
1658    pub(crate) async fn collect_past_blocks_with_metadata(
1659        self,
1660        range: std::ops::Range<u64>,
1661        only_from_available_block_range: bool,
1662    ) -> Vec<Option<BlockWithMetadata>>
1663    where
1664        REv: From<StorageRequest>,
1665    {
1666        futures::future::join_all(range.into_iter().map(|block_height| {
1667            self.get_block_at_height_with_metadata_from_storage(
1668                block_height,
1669                only_from_available_block_range,
1670            )
1671        }))
1672        .await
1673        .into_iter()
1674        .collect()
1675    }
1676
1677    /// Gets the requested finality signature from storage.
1678    pub(crate) async fn get_finality_signature_from_storage(
1679        self,
1680        id: Box<FinalitySignatureId>,
1681    ) -> Option<FinalitySignature>
1682    where
1683        REv: From<StorageRequest>,
1684    {
1685        self.make_request(
1686            |responder| StorageRequest::GetFinalitySignature { id, responder },
1687            QueueKind::FromStorage,
1688        )
1689        .await
1690    }
1691
1692    pub(crate) async fn is_finality_signature_stored(self, id: Box<FinalitySignatureId>) -> bool
1693    where
1694        REv: From<StorageRequest>,
1695    {
1696        self.make_request(
1697            |responder| StorageRequest::IsFinalitySignatureStored { id, responder },
1698            QueueKind::FromStorage,
1699        )
1700        .await
1701    }
1702
1703    /// Fetches an item from a fetcher.
1704    pub(crate) async fn fetch<T>(
1705        self,
1706        id: T::Id,
1707        peer: NodeId,
1708        validation_metadata: Box<T::ValidationMetadata>,
1709    ) -> FetchResult<T>
1710    where
1711        REv: From<FetcherRequest<T>>,
1712        T: FetchItem + 'static,
1713    {
1714        self.make_request(
1715            |responder| FetcherRequest {
1716                id,
1717                peer,
1718                validation_metadata,
1719                responder,
1720            },
1721            QueueKind::Fetch,
1722        )
1723        .await
1724    }
1725
1726    pub(crate) async fn fetch_trie(
1727        self,
1728        hash: Digest,
1729        peers: Vec<NodeId>,
1730    ) -> Result<TrieAccumulatorResponse, TrieAccumulatorError>
1731    where
1732        REv: From<TrieAccumulatorRequest>,
1733    {
1734        self.make_request(
1735            |responder| TrieAccumulatorRequest {
1736                hash,
1737                peers,
1738                responder,
1739            },
1740            QueueKind::SyncGlobalState,
1741        )
1742        .await
1743    }
1744
1745    /// Passes the timestamp of a future block for which transactions are to be proposed.
1746    pub(crate) async fn request_appendable_block(
1747        self,
1748        timestamp: Timestamp,
1749        era_id: EraId,
1750        request_expiry: Timestamp,
1751    ) -> AppendableBlock
1752    where
1753        REv: From<TransactionBufferRequest>,
1754    {
1755        self.make_request(
1756            |responder| TransactionBufferRequest::GetAppendableBlock {
1757                timestamp,
1758                era_id,
1759                request_expiry,
1760                responder,
1761            },
1762            QueueKind::Consensus,
1763        )
1764        .await
1765    }
1766
1767    /// Enqueues a finalized block execution.
1768    pub(crate) async fn enqueue_block_for_execution(
1769        self,
1770        executable_block: ExecutableBlock,
1771        meta_block_state: MetaBlockState,
1772    ) where
1773        REv: From<StorageRequest> + From<ContractRuntimeRequest>,
1774    {
1775        // Get the key block height for the current protocol version's activation point, i.e. the
1776        // height of the final block of the previous protocol version.
1777        let key_block_height_for_activation_point = self
1778            .make_request(
1779                |responder| StorageRequest::GetKeyBlockHeightForActivationPoint { responder },
1780                QueueKind::FromStorage,
1781            )
1782            .await
1783            .unwrap_or_else(|| {
1784                warn!("key block height for current activation point unknown");
1785                0
1786            });
1787
1788        self.event_queue
1789            .schedule(
1790                ContractRuntimeRequest::EnqueueBlockForExecution {
1791                    executable_block,
1792                    key_block_height_for_activation_point,
1793                    meta_block_state,
1794                },
1795                QueueKind::ContractRuntime,
1796            )
1797            .await;
1798    }
1799
1800    pub(crate) async fn enqueue_protocol_upgrade(
1801        self,
1802        upgrade_config: ProtocolUpgradeConfig,
1803        next_block_height: u64,
1804        parent_hash: BlockHash,
1805        parent_seed: Digest,
1806    ) where
1807        REv: From<ContractRuntimeRequest>,
1808    {
1809        self.event_queue
1810            .schedule(
1811                ContractRuntimeRequest::DoProtocolUpgrade {
1812                    protocol_upgrade_config: upgrade_config,
1813                    next_block_height,
1814                    parent_hash,
1815                    parent_seed,
1816                },
1817                QueueKind::Control,
1818            )
1819            .await;
1820    }
1821
1822    /// Checks whether the transactions included in the block exist on the network and that
1823    /// the block is valid.
1824    pub(crate) async fn validate_block(
1825        self,
1826        sender: NodeId,
1827        proposed_block_height: u64,
1828        block: ProposedBlock<ClContext>,
1829    ) -> Result<(), Box<InvalidProposalError>>
1830    where
1831        REv: From<BlockValidationRequest>,
1832    {
1833        self.make_request(
1834            |responder| BlockValidationRequest {
1835                proposed_block_height,
1836                block,
1837                sender,
1838                responder,
1839            },
1840            QueueKind::Regular,
1841        )
1842        .await
1843    }
1844
1845    /// Announces that a block has been proposed.
1846    pub(crate) async fn announce_proposed_block(self, proposed_block: ProposedBlock<ClContext>)
1847    where
1848        REv: From<ConsensusAnnouncement>,
1849    {
1850        self.event_queue
1851            .schedule(
1852                ConsensusAnnouncement::Proposed(Box::new(proposed_block)),
1853                QueueKind::Consensus,
1854            )
1855            .await;
1856    }
1857
1858    /// Announces that a block has been finalized.
1859    pub(crate) async fn announce_finalized_block(self, finalized_block: FinalizedBlock)
1860    where
1861        REv: From<ConsensusAnnouncement>,
1862    {
1863        self.event_queue
1864            .schedule(
1865                ConsensusAnnouncement::Finalized(Box::new(finalized_block)),
1866                QueueKind::Consensus,
1867            )
1868            .await;
1869    }
1870
1871    /// Announces that a meta block has been created or its state has changed.
1872    pub(crate) async fn announce_meta_block(self, meta_block: MetaBlock)
1873    where
1874        REv: From<MetaBlockAnnouncement>,
1875    {
1876        self.event_queue
1877            .schedule(MetaBlockAnnouncement(meta_block), QueueKind::Regular)
1878            .await;
1879    }
1880
1881    /// Announces that a finalized block has been created, but it was not
1882    /// executed.
1883    pub(crate) async fn announce_unexecuted_block(self, block_height: u64)
1884    where
1885        REv: From<UnexecutedBlockAnnouncement>,
1886    {
1887        self.event_queue
1888            .schedule(
1889                UnexecutedBlockAnnouncement(block_height),
1890                QueueKind::Regular,
1891            )
1892            .await;
1893    }
1894
1895    /// An equivocation has been detected.
1896    pub(crate) async fn announce_fault_event(
1897        self,
1898        era_id: EraId,
1899        public_key: PublicKey,
1900        timestamp: Timestamp,
1901    ) where
1902        REv: From<ConsensusAnnouncement>,
1903    {
1904        self.event_queue
1905            .schedule(
1906                ConsensusAnnouncement::Fault {
1907                    era_id,
1908                    public_key: Box::new(public_key),
1909                    timestamp,
1910                },
1911                QueueKind::Consensus,
1912            )
1913            .await;
1914    }
1915
1916    /// Blocks a specific peer due to a transgression.
1917    ///
1918    /// This function will also emit a log message for the block.
1919    pub(crate) async fn announce_block_peer_with_justification(
1920        self,
1921        offender: NodeId,
1922        justification: BlocklistJustification,
1923    ) where
1924        REv: From<PeerBehaviorAnnouncement>,
1925    {
1926        warn!(%offender, %justification, "banning peer");
1927        self.event_queue
1928            .schedule(
1929                PeerBehaviorAnnouncement::OffenseCommitted {
1930                    offender: Box::new(offender),
1931                    justification: Box::new(justification),
1932                },
1933                QueueKind::NetworkInfo,
1934            )
1935            .await;
1936    }
1937
1938    /// Gets the next scheduled upgrade, if any.
1939    pub(crate) async fn get_next_upgrade(self) -> Option<NextUpgrade>
1940    where
1941        REv: From<UpgradeWatcherRequest> + Send,
1942    {
1943        self.make_request(UpgradeWatcherRequest, QueueKind::Control)
1944            .await
1945    }
1946
1947    /// Requests a query be executed on the Contract Runtime component.
1948    pub(crate) async fn query_global_state(self, request: QueryRequest) -> QueryResult
1949    where
1950        REv: From<ContractRuntimeRequest>,
1951    {
1952        self.make_request(
1953            |responder| ContractRuntimeRequest::Query { request, responder },
1954            QueueKind::ContractRuntime,
1955        )
1956        .await
1957    }
1958
1959    /// Retrieves an `AddressableEntity` from under the given entity address (or key, if the former
1960    /// is not found) in global state.
1961    pub(crate) async fn get_addressable_entity(
1962        self,
1963        state_root_hash: Digest,
1964        entity_addr: EntityAddr,
1965    ) -> AddressableEntityResult
1966    where
1967        REv: From<ContractRuntimeRequest>,
1968    {
1969        self.make_request(
1970            |responder| ContractRuntimeRequest::GetAddressableEntity {
1971                state_root_hash,
1972                entity_addr,
1973                responder,
1974            },
1975            QueueKind::ContractRuntime,
1976        )
1977        .await
1978    }
1979
1980    /// Retrieves an `EntryPointValue` from under the given key in global state if present.
1981    pub(crate) async fn does_entry_point_exist(
1982        self,
1983        state_root_hash: Digest,
1984        contract_hash: HashAddr,
1985        entry_point_name: String,
1986    ) -> EntryPointExistsResult
1987    where
1988        REv: From<ContractRuntimeRequest>,
1989    {
1990        self.make_request(
1991            |responder| ContractRuntimeRequest::GetEntryPointExists {
1992                state_root_hash,
1993                contract_hash,
1994                entry_point_name,
1995                responder,
1996            },
1997            QueueKind::ContractRuntime,
1998        )
1999        .await
2000    }
2001
2002    /// Retrieves a `Package` from under the given key in global state if present.
2003    pub(crate) async fn get_package(
2004        self,
2005        state_root_hash: Digest,
2006        package_addr: PackageAddr,
2007    ) -> Option<Box<Package>>
2008    where
2009        REv: From<ContractRuntimeRequest>,
2010    {
2011        let key = Key::Hash(package_addr);
2012        let query_request = QueryRequest::new(state_root_hash, key, vec![]);
2013
2014        match self.query_global_state(query_request).await {
2015            QueryResult::RootNotFound | QueryResult::Failure(_) => None,
2016            QueryResult::ValueNotFound(_) => {
2017                let query_request =
2018                    QueryRequest::new(state_root_hash, Key::SmartContract(package_addr), vec![]);
2019                debug!("requesting under different key");
2020                if let QueryResult::Success { value, .. } =
2021                    self.query_global_state(query_request).await
2022                {
2023                    value.into_package().map(Box::new)
2024                } else {
2025                    None
2026                }
2027            }
2028            QueryResult::Success { value, .. } => value
2029                .into_contract_package()
2030                .map(Package::from)
2031                .map(Box::new),
2032        }
2033    }
2034
2035    /// Requests a query be executed on the Contract Runtime component.
2036    pub(crate) async fn get_balance(self, request: BalanceRequest) -> BalanceResult
2037    where
2038        REv: From<ContractRuntimeRequest>,
2039    {
2040        self.make_request(
2041            |responder| ContractRuntimeRequest::GetBalance { request, responder },
2042            QueueKind::ContractRuntime,
2043        )
2044        .await
2045    }
2046
2047    /// Returns a map of validators weights for all eras as known from `root_hash`.
2048    ///
2049    /// This operation is read only.
2050    pub(crate) async fn get_era_validators_from_contract_runtime(
2051        self,
2052        request: EraValidatorsRequest,
2053    ) -> EraValidatorsResult
2054    where
2055        REv: From<ContractRuntimeRequest>,
2056    {
2057        self.make_request(
2058            |responder| ContractRuntimeRequest::GetEraValidators { request, responder },
2059            QueueKind::ContractRuntime,
2060        )
2061        .await
2062    }
2063
2064    pub(crate) async fn get_seigniorage_recipients_snapshot_from_contract_runtime(
2065        self,
2066        request: SeigniorageRecipientsRequest,
2067    ) -> SeigniorageRecipientsResult
2068    where
2069        REv: From<ContractRuntimeRequest>,
2070    {
2071        self.make_request(
2072            |responder| ContractRuntimeRequest::GetSeigniorageRecipients { request, responder },
2073            QueueKind::ContractRuntime,
2074        )
2075        .await
2076    }
2077
2078    /// Requests a query be executed on the Contract Runtime component.
2079    pub(crate) async fn get_tagged_values(self, request: TaggedValuesRequest) -> TaggedValuesResult
2080    where
2081        REv: From<ContractRuntimeRequest>,
2082    {
2083        self.make_request(
2084            |responder| ContractRuntimeRequest::GetTaggedValues { request, responder },
2085            QueueKind::ContractRuntime,
2086        )
2087        .await
2088    }
2089
2090    pub(crate) async fn get_prefixed_values(
2091        self,
2092        request: PrefixedValuesRequest,
2093    ) -> PrefixedValuesResult
2094    where
2095        REv: From<ContractRuntimeRequest>,
2096    {
2097        self.make_request(
2098            |responder| ContractRuntimeRequest::QueryByPrefix { request, responder },
2099            QueueKind::ContractRuntime,
2100        )
2101        .await
2102    }
2103
2104    /// Returns the value of the execution results checksum stored in the ChecksumRegistry for the
2105    /// given state root hash.
2106    pub(crate) async fn get_execution_results_checksum(
2107        self,
2108        state_root_hash: Digest,
2109    ) -> ExecutionResultsChecksumResult
2110    where
2111        REv: From<ContractRuntimeRequest>,
2112    {
2113        self.make_request(
2114            |responder| ContractRuntimeRequest::GetExecutionResultsChecksum {
2115                state_root_hash,
2116                responder,
2117            },
2118            QueueKind::ContractRuntime,
2119        )
2120        .await
2121    }
2122
2123    /// Get our public key from consensus, and if we're a validator, the next round length.
2124    pub(crate) async fn consensus_status(self) -> Option<ConsensusStatus>
2125    where
2126        REv: From<ConsensusRequest>,
2127    {
2128        self.make_request(ConsensusRequest::Status, QueueKind::Consensus)
2129            .await
2130    }
2131
2132    /// Returns a list of validator status changes, by public key.
2133    pub(crate) async fn get_consensus_validator_changes(self) -> ConsensusValidatorChanges
2134    where
2135        REv: From<ConsensusRequest>,
2136    {
2137        self.make_request(ConsensusRequest::ValidatorChanges, QueueKind::Consensus)
2138            .await
2139    }
2140
2141    /// Dump consensus state for a specific era, using the supplied function to serialize the
2142    /// output.
2143    pub(crate) async fn diagnostics_port_dump_consensus_state(
2144        self,
2145        era_id: Option<EraId>,
2146        serialize: fn(&EraDump<'_>) -> Result<Vec<u8>, Cow<'static, str>>,
2147    ) -> Result<Vec<u8>, Cow<'static, str>>
2148    where
2149        REv: From<DumpConsensusStateRequest>,
2150    {
2151        self.make_request(
2152            |responder| DumpConsensusStateRequest {
2153                era_id,
2154                serialize,
2155                responder,
2156            },
2157            QueueKind::Control,
2158        )
2159        .await
2160    }
2161
2162    /// Dump the event queue contents to the diagnostics port, using the given serializer.
2163    pub(crate) async fn diagnostics_port_dump_queue(self, dump_format: QueueDumpFormat)
2164    where
2165        REv: From<ControlAnnouncement>,
2166    {
2167        self.make_request(
2168            |responder| ControlAnnouncement::QueueDumpRequest {
2169                dump_format,
2170                finished: responder,
2171            },
2172            QueueKind::Control,
2173        )
2174        .await;
2175    }
2176
2177    /// Activates/deactivates a failpoint from a given activation.
2178    pub(crate) async fn activate_failpoint(self, activation: FailpointActivation)
2179    where
2180        REv: From<ControlAnnouncement>,
2181    {
2182        self.event_queue
2183            .schedule(
2184                ControlAnnouncement::ActivateFailpoint { activation },
2185                QueueKind::Control,
2186            )
2187            .await;
2188    }
2189
2190    /// Announce that the node be shut down due to a request from a user.
2191    pub(crate) async fn announce_user_shutdown_request(self)
2192    where
2193        REv: From<ControlAnnouncement>,
2194    {
2195        self.event_queue
2196            .schedule(
2197                ControlAnnouncement::ShutdownDueToUserRequest,
2198                QueueKind::Control,
2199            )
2200            .await;
2201    }
2202
2203    /// Announce that a block which wasn't previously stored on this node has been fetched and
2204    /// stored.
2205    pub(crate) async fn announce_fetched_new_block(self, block: Arc<Block>, peer: NodeId)
2206    where
2207        REv: From<FetchedNewBlockAnnouncement>,
2208    {
2209        self.event_queue
2210            .schedule(
2211                FetchedNewBlockAnnouncement { block, peer },
2212                QueueKind::Fetch,
2213            )
2214            .await;
2215    }
2216
2217    /// Announce that a finality signature which wasn't previously stored on this node has been
2218    /// fetched and stored.
2219    pub(crate) async fn announce_fetched_new_finality_signature(
2220        self,
2221        finality_signature: Box<FinalitySignature>,
2222        peer: NodeId,
2223    ) where
2224        REv: From<FetchedNewFinalitySignatureAnnouncement>,
2225    {
2226        self.event_queue
2227            .schedule(
2228                FetchedNewFinalitySignatureAnnouncement {
2229                    finality_signature,
2230                    peer,
2231                },
2232                QueueKind::Fetch,
2233            )
2234            .await;
2235    }
2236
2237    /// Get the bytes for the chainspec file and genesis_accounts
2238    /// and global_state bytes if the files are present.
2239    pub(crate) async fn get_chainspec_raw_bytes(self) -> Arc<ChainspecRawBytes>
2240    where
2241        REv: From<ChainspecRawBytesRequest> + Send,
2242    {
2243        self.make_request(
2244            ChainspecRawBytesRequest::GetChainspecRawBytes,
2245            QueueKind::NetworkInfo,
2246        )
2247        .await
2248    }
2249
2250    /// Stores a set of given finalized approvals in storage.
2251    ///
2252    /// Any previously stored finalized approvals for the given hash are quietly overwritten
2253    pub(crate) async fn store_finalized_approvals(
2254        self,
2255        transaction_hash: TransactionHash,
2256        finalized_approvals: BTreeSet<Approval>,
2257    ) -> bool
2258    where
2259        REv: From<StorageRequest>,
2260    {
2261        self.make_request(
2262            |responder| StorageRequest::StoreFinalizedApprovals {
2263                transaction_hash,
2264                finalized_approvals,
2265                responder,
2266            },
2267            QueueKind::ToStorage,
2268        )
2269        .await
2270    }
2271
2272    /// Requests execution of a single transaction, without committing its effects.  Intended to be
2273    /// used for debugging & discovery purposes.
2274    pub(crate) async fn speculatively_execute(
2275        self,
2276        block_header: Box<BlockHeader>,
2277        transaction: Box<Transaction>,
2278    ) -> SpeculativeExecutionResult
2279    where
2280        REv: From<ContractRuntimeRequest>,
2281    {
2282        self.make_request(
2283            |responder| ContractRuntimeRequest::SpeculativelyExecute {
2284                block_header,
2285                transaction,
2286                responder,
2287            },
2288            QueueKind::ContractRuntime,
2289        )
2290        .await
2291    }
2292
2293    /// Reads block execution results (or chunk) from Storage component.
2294    pub(crate) async fn get_block_execution_results_or_chunk_from_storage(
2295        self,
2296        id: BlockExecutionResultsOrChunkId,
2297    ) -> Option<BlockExecutionResultsOrChunk>
2298    where
2299        REv: From<StorageRequest>,
2300    {
2301        self.make_request(
2302            |responder| StorageRequest::GetBlockExecutionResultsOrChunk { id, responder },
2303            QueueKind::FromStorage,
2304        )
2305        .await
2306    }
2307
2308    /// Gets peers for a given block from the block accumulator.
2309    pub(crate) async fn get_block_accumulated_peers(
2310        self,
2311        block_hash: BlockHash,
2312    ) -> Option<Vec<NodeId>>
2313    where
2314        REv: From<BlockAccumulatorRequest>,
2315    {
2316        self.make_request(
2317            |responder| BlockAccumulatorRequest::GetPeersForBlock {
2318                block_hash,
2319                responder,
2320            },
2321            QueueKind::NetworkInfo,
2322        )
2323        .await
2324    }
2325
2326    /// Set a new stopping point for the node.
2327    ///
2328    /// Returns a potentially previously set stop-at spec.
2329    pub(crate) async fn set_node_stop_at(self, stop_at: Option<StopAtSpec>) -> Option<StopAtSpec>
2330    where
2331        REv: From<SetNodeStopRequest>,
2332    {
2333        self.make_request(
2334            |responder| SetNodeStopRequest { stop_at, responder },
2335            QueueKind::Control,
2336        )
2337        .await
2338    }
2339}
2340
2341/// Construct a fatal error effect.
2342///
2343/// This macro is a convenient wrapper around `EffectBuilder::fatal` that inserts the `file!()` and
2344/// `line!()` number automatically.
2345#[macro_export]
2346macro_rules! fatal {
2347    ($effect_builder:expr, $($arg:tt)*) => {
2348        $effect_builder.fatal(file!(), line!(), format!($($arg)*))
2349    };
2350}