1pub(crate) mod announcements;
94pub(crate) mod diagnostics_port;
95pub(crate) mod incoming;
96pub(crate) mod requests;
97
98use std::{
99 any::type_name,
100 borrow::Cow,
101 collections::{BTreeMap, BTreeSet, HashMap, HashSet},
102 fmt::{self, Debug, Display, Formatter},
103 future::Future,
104 mem,
105 sync::Arc,
106 time::{Duration, Instant},
107};
108
109use datasize::DataSize;
110use futures::{channel::oneshot, future::BoxFuture, FutureExt};
111use once_cell::sync::Lazy;
112use serde::{Serialize, Serializer};
113use smallvec::{smallvec, SmallVec};
114use tokio::{sync::Semaphore, time};
115use tracing::{debug, error, warn};
116
117use casper_binary_port::{
118 ConsensusStatus, ConsensusValidatorChanges, LastProgress, NetworkName, RecordId, Uptime,
119};
120use casper_storage::{
121 block_store::types::ApprovalsHashes,
122 data_access_layer::{
123 prefixed_values::{PrefixedValuesRequest, PrefixedValuesResult},
124 tagged_values::{TaggedValuesRequest, TaggedValuesResult},
125 AddressableEntityResult, BalanceRequest, BalanceResult, EraValidatorsRequest,
126 EraValidatorsResult, ExecutionResultsChecksumResult, PutTrieRequest, PutTrieResult,
127 QueryRequest, QueryResult, SeigniorageRecipientsRequest, SeigniorageRecipientsResult,
128 TrieRequest, TrieResult,
129 },
130 DbRawBytesSpec,
131};
132use casper_types::{
133 execution::{Effects as ExecutionEffects, ExecutionResult},
134 Approval, AvailableBlockRange, Block, BlockHash, BlockHeader, BlockSignatures,
135 BlockSynchronizerStatus, BlockV2, ChainspecRawBytes, DeployHash, Digest, EntityAddr, EraId,
136 ExecutionInfo, FinalitySignature, FinalitySignatureId, FinalitySignatureV2, HashAddr, Key,
137 NextUpgrade, Package, PackageAddr, ProtocolUpgradeConfig, PublicKey, TimeDiff, Timestamp,
138 Transaction, TransactionHash, TransactionId, Transfer, U512,
139};
140
141use crate::{
142 components::{
143 block_synchronizer::{
144 GlobalStateSynchronizerError, GlobalStateSynchronizerResponse, TrieAccumulatorError,
145 TrieAccumulatorResponse,
146 },
147 consensus::{ClContext, EraDump, ProposedBlock},
148 contract_runtime::SpeculativeExecutionResult,
149 diagnostics_port::StopAtSpec,
150 fetcher::{FetchItem, FetchResult},
151 gossiper::GossipItem,
152 network::{blocklist::BlocklistJustification, FromIncoming, NetworkInsights},
153 transaction_acceptor,
154 },
155 contract_runtime::ExecutionPreState,
156 failpoints::FailpointActivation,
157 reactor::{main_reactor::ReactorState, EventQueueHandle, QueueKind},
158 types::{
159 appendable_block::AppendableBlock, BlockExecutionResultsOrChunk,
160 BlockExecutionResultsOrChunkId, BlockWithMetadata, ExecutableBlock, FinalizedBlock,
161 InvalidProposalError, LegacyDeploy, MetaBlock, MetaBlockState, NodeId, TransactionHeader,
162 },
163 utils::{fmt_limit::FmtLimit, SharedFlag, Source},
164};
165use announcements::{
166 BlockAccumulatorAnnouncement, ConsensusAnnouncement, ContractRuntimeAnnouncement,
167 ControlAnnouncement, FatalAnnouncement, FetchedNewBlockAnnouncement,
168 FetchedNewFinalitySignatureAnnouncement, GossiperAnnouncement, MetaBlockAnnouncement,
169 PeerBehaviorAnnouncement, QueueDumpFormat, TransactionAcceptorAnnouncement,
170 TransactionBufferAnnouncement, UnexecutedBlockAnnouncement, UpgradeWatcherAnnouncement,
171};
172use casper_storage::data_access_layer::EntryPointExistsResult;
173use diagnostics_port::DumpConsensusStateRequest;
174use requests::{
175 AcceptTransactionRequest, BeginGossipRequest, BlockAccumulatorRequest,
176 BlockSynchronizerRequest, BlockValidationRequest, ChainspecRawBytesRequest, ConsensusRequest,
177 ContractRuntimeRequest, FetcherRequest, MakeBlockExecutableRequest, MarkBlockCompletedRequest,
178 MetricsRequest, NetworkInfoRequest, NetworkRequest, ReactorInfoRequest, SetNodeStopRequest,
179 StorageRequest, SyncGlobalStateRequest, TransactionBufferRequest, TrieAccumulatorRequest,
180 UpgradeWatcherRequest,
181};
182
183static UNOBTAINABLE: Lazy<Semaphore> = Lazy::new(|| Semaphore::new(0));
185
186pub(crate) type Effect<Ev> = BoxFuture<'static, Multiple<Ev>>;
188
189pub(crate) type Effects<Ev> = Multiple<Effect<Ev>>;
191
192pub(crate) type Multiple<T> = SmallVec<[T; 2]>;
199
200#[derive(Debug, Serialize, PartialEq, Eq, Hash, Copy, Clone, DataSize)]
202pub(crate) enum GossipTarget {
203 Mixed(EraId),
205 All,
207}
208
209impl Display for GossipTarget {
210 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
211 match self {
212 GossipTarget::Mixed(era_id) => write!(formatter, "gossip target mixed for {}", era_id),
213 GossipTarget::All => write!(formatter, "gossip target all"),
214 }
215 }
216}
217
218#[must_use]
220#[derive(DataSize)]
221pub(crate) struct Responder<T> {
222 sender: Option<oneshot::Sender<T>>,
224 is_shutting_down: SharedFlag,
226}
227
228#[must_use]
230#[derive(DataSize, Debug)]
231pub(crate) struct AutoClosingResponder<T>(Responder<Option<T>>);
232
233impl<T> AutoClosingResponder<T> {
234 pub(crate) fn from_opt_responder(responder: Responder<Option<T>>) -> Self {
236 AutoClosingResponder(responder)
237 }
238
239 fn into_inner(mut self) -> Responder<Option<T>> {
241 let is_shutting_down = self.0.is_shutting_down;
242 mem::replace(
243 &mut self.0,
244 Responder {
245 sender: None,
246 is_shutting_down,
247 },
248 )
249 }
250}
251
252impl<T: Debug> AutoClosingResponder<T> {
253 pub(crate) async fn respond(self, data: T) {
255 self.into_inner().respond(Some(data)).await;
256 }
257
258 pub(crate) async fn respond_none(self) {
260 self.into_inner().respond(None).await;
261 }
262}
263
264impl<T> Drop for AutoClosingResponder<T> {
265 fn drop(&mut self) {
266 if let Some(sender) = self.0.sender.take() {
267 debug!(
268 sending_value = %self.0,
269 "responding None by dropping auto-close responder"
270 );
271 if let Err(_unsent_value) = sender.send(None) {
273 debug!(
274 unsent_value = %self.0,
275 "failed to auto-close responder, ignoring"
276 );
277 }
278 }
279 }
280}
281
282impl<T: 'static + Send> Responder<T> {
283 #[inline]
285 fn new(sender: oneshot::Sender<T>, is_shutting_down: SharedFlag) -> Self {
286 Responder {
287 sender: Some(sender),
288 is_shutting_down,
289 }
290 }
291
292 #[cfg(test)]
297 #[inline]
298 pub(crate) fn without_shutdown(sender: oneshot::Sender<T>) -> Self {
299 Responder::new(sender, SharedFlag::global_shared())
300 }
301}
302
303impl<T: Debug> Responder<T> {
304 pub(crate) async fn respond(mut self, data: T) {
306 if let Some(sender) = self.sender.take() {
307 if let Err(data) = sender.send(data) {
308 debug!(
313 data=?FmtLimit::new(1000, &data),
314 "ignored failure to send response to request down oneshot channel"
315 );
316 }
317 } else {
318 error!(
319 data=?FmtLimit::new(1000, &data),
320 "tried to send a value down a responder channel, but it was already used"
321 );
322 }
323 }
324}
325
326impl<T> Debug for Responder<T> {
327 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
328 write!(formatter, "Responder<{}>", type_name::<T>(),)
329 }
330}
331
332impl<T> Display for Responder<T> {
333 fn fmt(&self, formatter: &mut Formatter<'_>) -> fmt::Result {
334 write!(formatter, "responder({})", type_name::<T>(),)
335 }
336}
337
338impl<T> Drop for Responder<T> {
339 fn drop(&mut self) {
340 if self.sender.is_some() {
341 if self.is_shutting_down.is_set() {
342 debug!(
343 responder=?self,
344 "ignored dropping of responder during shutdown"
345 );
346 } else {
347 error!(
351 responder=?self,
352 "dropped without being responded to outside of shutdown"
353 );
354 }
355 }
356 }
357}
358
359impl<T> Serialize for Responder<T> {
360 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
361 serializer.serialize_str(&format!("{:?}", self))
362 }
363}
364
365impl<T> Serialize for AutoClosingResponder<T> {
366 fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
367 self.0.serialize(serializer)
368 }
369}
370
371pub(crate) trait EffectExt: Future + Send {
373 fn event<U, F>(self, f: F) -> Effects<U>
377 where
378 F: FnOnce(Self::Output) -> U + 'static + Send,
379 U: 'static,
380 Self: Sized;
381
382 fn ignore<Ev>(self) -> Effects<Ev>;
384}
385
386pub(crate) trait EffectResultExt {
389 type Value;
391 type Error;
393
394 fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
399 where
400 F: FnOnce(Self::Value) -> U + 'static + Send,
401 G: FnOnce(Self::Error) -> U + 'static + Send,
402 U: 'static;
403}
404
405impl<T> EffectExt for T
406where
407 T: Future + Send + 'static + Sized,
408{
409 fn event<U, F>(self, f: F) -> Effects<U>
410 where
411 F: FnOnce(Self::Output) -> U + 'static + Send,
412 U: 'static,
413 {
414 smallvec![self.map(f).map(|item| smallvec![item]).boxed()]
415 }
416
417 fn ignore<Ev>(self) -> Effects<Ev> {
418 smallvec![self.map(|_| Multiple::new()).boxed()]
419 }
420}
421
422impl<T, V, E> EffectResultExt for T
423where
424 T: Future<Output = Result<V, E>> + Send + 'static + Sized,
425{
426 type Value = V;
427 type Error = E;
428
429 fn result<U, F, G>(self, f_ok: F, f_err: G) -> Effects<U>
430 where
431 F: FnOnce(V) -> U + 'static + Send,
432 G: FnOnce(E) -> U + 'static + Send,
433 U: 'static,
434 {
435 smallvec![self
436 .map(|result| result.map_or_else(f_err, f_ok))
437 .map(|item| smallvec![item])
438 .boxed()]
439 }
440}
441
442#[derive(Debug)]
450pub(crate) struct EffectBuilder<REv: 'static> {
451 event_queue: EventQueueHandle<REv>,
453}
454
455impl<REv> Clone for EffectBuilder<REv> {
457 fn clone(&self) -> Self {
458 *self
459 }
460}
461
462impl<REv> Copy for EffectBuilder<REv> {}
463
464impl<REv> EffectBuilder<REv> {
465 pub(crate) fn new(event_queue: EventQueueHandle<REv>) -> Self {
467 EffectBuilder { event_queue }
468 }
469
470 pub(crate) fn into_inner(self) -> EventQueueHandle<REv> {
472 self.event_queue
473 }
474
475 pub(crate) async fn make_request<T, Q, F>(self, f: F, queue_kind: QueueKind) -> T
491 where
492 T: Send + 'static,
493 Q: Into<REv>,
494 F: FnOnce(Responder<T>) -> Q,
495 {
496 let (event, wait_future) = self.create_request_parts(f);
497
498 self.event_queue.schedule(event, queue_kind).await;
500 wait_future.await
501 }
502
503 pub(crate) fn create_request_parts<T, Q, F>(self, f: F) -> (REv, impl Future<Output = T>)
511 where
512 T: Send + 'static,
513 Q: Into<REv>,
514 F: FnOnce(Responder<T>) -> Q,
515 {
516 let (sender, receiver) = oneshot::channel();
518
519 let responder = Responder::new(sender, self.event_queue.shutdown_flag());
521
522 let request_event = f(responder).into();
524
525 let fut = async move {
526 match receiver.await {
527 Ok(value) => value,
528 Err(err) => {
529 if self.event_queue.shutdown_flag().is_set() {
535 debug!(%err, channel=?type_name::<T>(), "ignoring closed channel due to shutdown");
536 } else {
537 error!(%err, channel=?type_name::<T>(), "request for channel closed, this may be a bug? \
538 check if a component is stuck from now on");
539 }
540
541 let _ = UNOBTAINABLE.acquire().await;
544 panic!("should never obtain unobtainable semaphore");
545 }
546 }
547 };
548
549 (request_event, fut)
550 }
551
552 #[inline(always)]
557 #[allow(clippy::manual_async_fn)]
558 pub(crate) fn immediately(self) -> impl Future<Output = ()> + Send {
559 async {}
562 }
563
564 pub(crate) async fn fatal(self, file: &'static str, line: u32, msg: String)
568 where
569 REv: From<FatalAnnouncement>,
570 {
571 self.event_queue
572 .schedule(FatalAnnouncement { file, line, msg }, QueueKind::Control)
573 .await;
574 }
575
576 pub(crate) async fn set_timeout(self, timeout: Duration) -> Duration {
578 let then = Instant::now();
579 time::sleep(timeout).await;
580 then.elapsed()
581 }
582
583 pub(crate) async fn get_metrics(self) -> Option<String>
587 where
588 REv: From<MetricsRequest>,
589 {
590 self.make_request(
591 |responder| MetricsRequest::RenderNodeMetricsText { responder },
592 QueueKind::Api,
593 )
594 .await
595 }
596
597 pub(crate) async fn send_message<P>(self, dest: NodeId, payload: P)
602 where
603 REv: From<NetworkRequest<P>>,
604 {
605 self.make_request(
606 |responder| NetworkRequest::SendMessage {
607 dest: Box::new(dest),
608 payload: Box::new(payload),
609 respond_after_queueing: false,
610 auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
611 },
612 QueueKind::Network,
613 )
614 .await;
615 }
616
617 pub(crate) async fn enqueue_message<P>(self, dest: NodeId, payload: P)
622 where
623 REv: From<NetworkRequest<P>>,
624 {
625 self.make_request(
626 |responder| NetworkRequest::SendMessage {
627 dest: Box::new(dest),
628 payload: Box::new(payload),
629 respond_after_queueing: true,
630 auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
631 },
632 QueueKind::Network,
633 )
634 .await;
635 }
636
637 pub(crate) async fn broadcast_message_to_validators<P>(self, payload: P, era_id: EraId)
639 where
640 REv: From<NetworkRequest<P>>,
641 {
642 self.make_request(
643 |responder| {
644 debug!("validator broadcast for {}", era_id);
645 NetworkRequest::ValidatorBroadcast {
646 payload: Box::new(payload),
647 era_id,
648 auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
649 }
650 },
651 QueueKind::Network,
652 )
653 .await;
654 }
655
656 pub(crate) async fn gossip_message<P>(
663 self,
664 payload: P,
665 gossip_target: GossipTarget,
666 count: usize,
667 exclude: HashSet<NodeId>,
668 ) -> HashSet<NodeId>
669 where
670 REv: From<NetworkRequest<P>>,
671 P: Send,
672 {
673 self.make_request(
674 |responder| NetworkRequest::Gossip {
675 payload: Box::new(payload),
676 gossip_target,
677 count,
678 exclude,
679 auto_closing_responder: AutoClosingResponder::from_opt_responder(responder),
680 },
681 QueueKind::Network,
682 )
683 .await
684 .unwrap_or_default()
685 }
686
687 pub(crate) async fn get_network_insights(self) -> NetworkInsights
689 where
690 REv: From<NetworkInfoRequest>,
691 {
692 self.make_request(
693 |responder| NetworkInfoRequest::Insight { responder },
694 QueueKind::Regular,
695 )
696 .await
697 }
698
699 pub(crate) async fn network_peers(self) -> BTreeMap<NodeId, String>
701 where
702 REv: From<NetworkInfoRequest>,
703 {
704 self.make_request(
705 |responder| NetworkInfoRequest::Peers { responder },
706 QueueKind::Api,
707 )
708 .await
709 }
710
711 pub async fn get_fully_connected_peers(self, count: usize) -> Vec<NodeId>
713 where
714 REv: From<NetworkInfoRequest>,
715 {
716 self.make_request(
717 |responder| NetworkInfoRequest::FullyConnectedPeers { count, responder },
718 QueueKind::NetworkInfo,
719 )
720 .await
721 }
722
723 pub(crate) async fn announce_expired_transactions(self, hashes: Vec<TransactionHash>)
725 where
726 REv: From<TransactionBufferAnnouncement>,
727 {
728 self.event_queue
729 .schedule(
730 TransactionBufferAnnouncement::TransactionsExpired(hashes),
731 QueueKind::Validation,
732 )
733 .await;
734 }
735
736 pub(crate) async fn announce_incoming<P>(self, sender: NodeId, payload: P)
738 where
739 REv: FromIncoming<P>,
740 {
741 self.event_queue
742 .schedule(
743 <REv as FromIncoming<P>>::from_incoming(sender, payload),
744 QueueKind::NetworkIncoming,
745 )
746 .await;
747 }
748
749 pub(crate) async fn announce_complete_item_received_via_gossip<T: GossipItem>(self, item: T::Id)
751 where
752 REv: From<GossiperAnnouncement<T>>,
753 {
754 assert!(
755 T::ID_IS_COMPLETE_ITEM,
756 "{} must be an item where the ID _is_ the complete item",
757 item
758 );
759 self.event_queue
760 .schedule(
761 GossiperAnnouncement::NewCompleteItem(item),
762 QueueKind::Gossip,
763 )
764 .await;
765 }
766
767 pub(crate) async fn announce_item_body_received_via_gossip<T: GossipItem>(
770 self,
771 item: Box<T>,
772 sender: NodeId,
773 ) where
774 REv: From<GossiperAnnouncement<T>>,
775 {
776 self.event_queue
777 .schedule(
778 GossiperAnnouncement::NewItemBody { item, sender },
779 QueueKind::Gossip,
780 )
781 .await;
782 }
783
784 pub(crate) async fn announce_finality_signature_accepted(
786 self,
787 finality_signature: Box<FinalitySignatureV2>,
788 ) where
789 REv: From<BlockAccumulatorAnnouncement>,
790 {
791 self.event_queue
792 .schedule(
793 BlockAccumulatorAnnouncement::AcceptedNewFinalitySignature { finality_signature },
794 QueueKind::FinalitySignature,
795 )
796 .await;
797 }
798
799 pub(crate) async fn make_block_executable(
804 self,
805 block_hash: BlockHash,
806 ) -> Option<ExecutableBlock>
807 where
808 REv: From<MakeBlockExecutableRequest>,
809 {
810 self.make_request(
811 |responder| MakeBlockExecutableRequest {
812 block_hash,
813 responder,
814 },
815 QueueKind::FromStorage,
816 )
817 .await
818 }
819
820 pub(crate) async fn mark_block_completed(self, block_height: u64) -> bool
826 where
827 REv: From<MarkBlockCompletedRequest>,
828 {
829 self.make_request(
830 |responder| MarkBlockCompletedRequest {
831 block_height,
832 responder,
833 },
834 QueueKind::FromStorage,
835 )
836 .await
837 }
838
839 pub(crate) async fn try_accept_transaction(
841 self,
842 transaction: Transaction,
843 is_speculative: bool,
844 ) -> Result<(), transaction_acceptor::Error>
845 where
846 REv: From<AcceptTransactionRequest>,
847 {
848 self.make_request(
849 |responder| AcceptTransactionRequest {
850 transaction,
851 is_speculative,
852 responder,
853 },
854 QueueKind::Api,
855 )
856 .await
857 }
858
859 pub(crate) fn announce_new_transaction_accepted(
861 self,
862 transaction: Arc<Transaction>,
863 source: Source,
864 ) -> impl Future<Output = ()>
865 where
866 REv: From<TransactionAcceptorAnnouncement>,
867 {
868 self.event_queue.schedule(
869 TransactionAcceptorAnnouncement::AcceptedNewTransaction {
870 transaction,
871 source,
872 },
873 QueueKind::Validation,
874 )
875 }
876
877 pub(crate) async fn announce_gossip_received<T>(self, item_id: T::Id, sender: NodeId)
880 where
881 REv: From<GossiperAnnouncement<T>>,
882 T: GossipItem,
883 {
884 self.event_queue
885 .schedule(
886 GossiperAnnouncement::GossipReceived { item_id, sender },
887 QueueKind::Gossip,
888 )
889 .await;
890 }
891
892 pub(crate) async fn announce_finished_gossiping<T>(self, item_id: T::Id)
894 where
895 REv: From<GossiperAnnouncement<T>>,
896 T: GossipItem,
897 {
898 self.event_queue
899 .schedule(
900 GossiperAnnouncement::FinishedGossiping(item_id),
901 QueueKind::Gossip,
902 )
903 .await;
904 }
905
906 pub(crate) fn announce_invalid_transaction(
907 self,
908 transaction: Transaction,
909 source: Source,
910 ) -> impl Future<Output = ()>
911 where
912 REv: From<TransactionAcceptorAnnouncement>,
913 {
914 self.event_queue.schedule(
915 TransactionAcceptorAnnouncement::InvalidTransaction {
916 transaction,
917 source,
918 },
919 QueueKind::Validation,
920 )
921 }
922
923 pub(crate) async fn upgrade_watcher_announcement(self, maybe_next_upgrade: Option<NextUpgrade>)
925 where
926 REv: From<UpgradeWatcherAnnouncement>,
927 {
928 self.event_queue
929 .schedule(
930 UpgradeWatcherAnnouncement(maybe_next_upgrade),
931 QueueKind::Control,
932 )
933 .await;
934 }
935
936 pub(crate) async fn announce_commit_step_success(self, era_id: EraId, effects: ExecutionEffects)
938 where
939 REv: From<ContractRuntimeAnnouncement>,
940 {
941 self.event_queue
942 .schedule(
943 ContractRuntimeAnnouncement::CommitStepSuccess { era_id, effects },
944 QueueKind::ContractRuntime,
945 )
946 .await;
947 }
948
949 pub(crate) async fn update_contract_runtime_state(self, new_pre_state: ExecutionPreState)
950 where
951 REv: From<ContractRuntimeRequest>,
952 {
953 self.event_queue
954 .schedule(
955 ContractRuntimeRequest::UpdatePreState { new_pre_state },
956 QueueKind::ContractRuntime,
957 )
958 .await;
959 }
960
961 pub(crate) async fn announce_upcoming_era_validators(
963 self,
964 era_that_is_ending: EraId,
965 upcoming_era_validators: BTreeMap<EraId, BTreeMap<PublicKey, U512>>,
966 ) where
967 REv: From<ContractRuntimeAnnouncement>,
968 {
969 self.event_queue
970 .schedule(
971 ContractRuntimeAnnouncement::UpcomingEraValidators {
972 era_that_is_ending,
973 upcoming_era_validators,
974 },
975 QueueKind::ContractRuntime,
976 )
977 .await;
978 }
979
980 pub(crate) async fn announce_new_era_gas_price(self, era_id: EraId, next_era_gas_price: u8)
981 where
982 REv: From<ContractRuntimeAnnouncement>,
983 {
984 self.event_queue
985 .schedule(
986 ContractRuntimeAnnouncement::NextEraGasPrice {
987 era_id,
988 next_era_gas_price,
989 },
990 QueueKind::ContractRuntime,
991 )
992 .await;
993 }
994
995 pub(crate) async fn begin_gossip<T>(self, item_id: T::Id, source: Source, target: GossipTarget)
997 where
998 T: GossipItem,
999 REv: From<BeginGossipRequest<T>>,
1000 {
1001 self.make_request(
1002 |responder| BeginGossipRequest {
1003 item_id,
1004 source,
1005 target,
1006 responder,
1007 },
1008 QueueKind::Gossip,
1009 )
1010 .await;
1011 }
1012
1013 pub(crate) async fn put_block_to_storage(self, block: Arc<Block>) -> bool
1015 where
1016 REv: From<StorageRequest>,
1017 {
1018 self.make_request(
1019 |responder| StorageRequest::PutBlock { block, responder },
1020 QueueKind::ToStorage,
1021 )
1022 .await
1023 }
1024
1025 pub(crate) async fn put_approvals_hashes_to_storage(
1027 self,
1028 approvals_hashes: Box<ApprovalsHashes>,
1029 ) -> bool
1030 where
1031 REv: From<StorageRequest>,
1032 {
1033 self.make_request(
1034 |responder| StorageRequest::PutApprovalsHashes {
1035 approvals_hashes,
1036 responder,
1037 },
1038 QueueKind::ToStorage,
1039 )
1040 .await
1041 }
1042
1043 pub(crate) async fn put_executed_block_to_storage(
1045 self,
1046 block: Arc<BlockV2>,
1047 approvals_hashes: Box<ApprovalsHashes>,
1048 execution_results: HashMap<TransactionHash, ExecutionResult>,
1049 ) -> bool
1050 where
1051 REv: From<StorageRequest>,
1052 {
1053 self.make_request(
1054 |responder| StorageRequest::PutExecutedBlock {
1055 block,
1056 approvals_hashes,
1057 execution_results,
1058 responder,
1059 },
1060 QueueKind::ToStorage,
1061 )
1062 .await
1063 }
1064
1065 pub(crate) async fn get_block_from_storage(self, block_hash: BlockHash) -> Option<Block>
1067 where
1068 REv: From<StorageRequest>,
1069 {
1070 self.make_request(
1071 |responder| StorageRequest::GetBlock {
1072 block_hash,
1073 responder,
1074 },
1075 QueueKind::FromStorage,
1076 )
1077 .await
1078 }
1079
1080 pub(crate) async fn get_block_utilization(
1081 self,
1082 era_id: EraId,
1083 block_height: u64,
1084 transaction_count: u64,
1085 ) -> Option<(u64, u64)>
1086 where
1087 REv: From<StorageRequest>,
1088 {
1089 self.make_request(
1090 |responder| StorageRequest::GetBlockUtilizationScore {
1091 era_id,
1092 block_height,
1093 switch_block_utilization: transaction_count,
1094 responder,
1095 },
1096 QueueKind::FromStorage,
1097 )
1098 .await
1099 }
1100
1101 pub(crate) async fn is_block_stored(self, block_hash: BlockHash) -> bool
1102 where
1103 REv: From<StorageRequest>,
1104 {
1105 self.make_request(
1106 |responder| StorageRequest::IsBlockStored {
1107 block_hash,
1108 responder,
1109 },
1110 QueueKind::FromStorage,
1111 )
1112 .await
1113 }
1114
1115 pub(crate) async fn get_approvals_hashes_from_storage(
1117 self,
1118 block_hash: BlockHash,
1119 ) -> Option<ApprovalsHashes>
1120 where
1121 REv: From<StorageRequest>,
1122 {
1123 self.make_request(
1124 |responder| StorageRequest::GetApprovalsHashes {
1125 block_hash,
1126 responder,
1127 },
1128 QueueKind::FromStorage,
1129 )
1130 .await
1131 }
1132
1133 pub(crate) async fn get_raw_data(
1134 self,
1135 record_id: RecordId,
1136 key: Vec<u8>,
1137 ) -> Option<DbRawBytesSpec>
1138 where
1139 REv: From<StorageRequest>,
1140 {
1141 self.make_request(
1142 |responder| StorageRequest::GetRawData {
1143 record_id,
1144 key,
1145 responder,
1146 },
1147 QueueKind::FromStorage,
1148 )
1149 .await
1150 }
1151
1152 pub(crate) async fn get_block_header_from_storage(
1154 self,
1155 block_hash: BlockHash,
1156 only_from_available_block_range: bool,
1157 ) -> Option<BlockHeader>
1158 where
1159 REv: From<StorageRequest>,
1160 {
1161 self.make_request(
1162 |responder| StorageRequest::GetBlockHeader {
1163 block_hash,
1164 only_from_available_block_range,
1165 responder,
1166 },
1167 QueueKind::FromStorage,
1168 )
1169 .await
1170 }
1171
1172 pub(crate) async fn get_block_header_at_height_from_storage(
1173 self,
1174 block_height: u64,
1175 only_from_available_block_range: bool,
1176 ) -> Option<BlockHeader>
1177 where
1178 REv: From<StorageRequest>,
1179 {
1180 self.make_request(
1181 |responder| StorageRequest::GetBlockHeaderByHeight {
1182 block_height,
1183 only_from_available_block_range,
1184 responder,
1185 },
1186 QueueKind::FromStorage,
1187 )
1188 .await
1189 }
1190
1191 pub(crate) async fn get_latest_switch_block_header_from_storage(self) -> Option<BlockHeader>
1192 where
1193 REv: From<StorageRequest>,
1194 {
1195 self.make_request(
1196 |responder| StorageRequest::GetLatestSwitchBlockHeader { responder },
1197 QueueKind::FromStorage,
1198 )
1199 .await
1200 }
1201
1202 pub(crate) async fn get_switch_block_header_by_era_id_from_storage(
1203 self,
1204 era_id: EraId,
1205 ) -> Option<BlockHeader>
1206 where
1207 REv: From<StorageRequest>,
1208 {
1209 self.make_request(
1210 |responder| StorageRequest::GetSwitchBlockHeaderByEra { era_id, responder },
1211 QueueKind::FromStorage,
1212 )
1213 .await
1214 }
1215
1216 pub(crate) async fn get_signature_from_storage(
1218 self,
1219 block_hash: BlockHash,
1220 public_key: PublicKey,
1221 ) -> Option<FinalitySignature>
1222 where
1223 REv: From<StorageRequest>,
1224 {
1225 self.make_request(
1226 |responder| StorageRequest::GetBlockSignature {
1227 block_hash,
1228 public_key: Box::new(public_key),
1229 responder,
1230 },
1231 QueueKind::FromStorage,
1232 )
1233 .await
1234 }
1235
1236 pub(crate) async fn get_execution_results_from_storage(
1237 self,
1238 block_hash: BlockHash,
1239 ) -> Option<Vec<(TransactionHash, TransactionHeader, ExecutionResult)>>
1240 where
1241 REv: From<StorageRequest>,
1242 {
1243 self.make_request(
1244 |responder| StorageRequest::GetExecutionResults {
1245 block_hash,
1246 responder,
1247 },
1248 QueueKind::FromStorage,
1249 )
1250 .await
1251 }
1252
1253 pub(crate) async fn put_block_header_to_storage(self, block_header: Box<BlockHeader>) -> bool
1255 where
1256 REv: From<StorageRequest>,
1257 {
1258 self.make_request(
1259 |responder| StorageRequest::PutBlockHeader {
1260 block_header,
1261 responder,
1262 },
1263 QueueKind::ToStorage,
1264 )
1265 .await
1266 }
1267
1268 pub(crate) async fn put_signatures_to_storage(self, signatures: BlockSignatures) -> bool
1273 where
1274 REv: From<StorageRequest>,
1275 {
1276 self.make_request(
1277 |responder| StorageRequest::PutBlockSignatures {
1278 signatures,
1279 responder,
1280 },
1281 QueueKind::ToStorage,
1282 )
1283 .await
1284 }
1285
1286 pub(crate) async fn put_finality_signature_to_storage(
1287 self,
1288 signature: FinalitySignature,
1289 ) -> bool
1290 where
1291 REv: From<StorageRequest>,
1292 {
1293 self.make_request(
1294 |responder| StorageRequest::PutFinalitySignature {
1295 signature: Box::new(signature),
1296 responder,
1297 },
1298 QueueKind::ToStorage,
1299 )
1300 .await
1301 }
1302
1303 pub(crate) async fn get_block_transfers_from_storage(
1305 self,
1306 block_hash: BlockHash,
1307 ) -> Option<Vec<Transfer>>
1308 where
1309 REv: From<StorageRequest>,
1310 {
1311 self.make_request(
1312 |responder| StorageRequest::GetBlockTransfers {
1313 block_hash,
1314 responder,
1315 },
1316 QueueKind::FromStorage,
1317 )
1318 .await
1319 }
1320
1321 pub(crate) async fn get_transactions_era_ids(
1324 self,
1325 transaction_hashes: HashSet<TransactionHash>,
1326 ) -> HashSet<EraId>
1327 where
1328 REv: From<StorageRequest>,
1329 {
1330 self.make_request(
1331 |responder| StorageRequest::GetTransactionsEraIds {
1332 transaction_hashes,
1333 responder,
1334 },
1335 QueueKind::FromStorage,
1336 )
1337 .await
1338 }
1339
1340 pub(crate) async fn get_highest_complete_block_from_storage(self) -> Option<Block>
1342 where
1343 REv: From<StorageRequest>,
1344 {
1345 self.make_request(
1346 |responder| StorageRequest::GetHighestCompleteBlock { responder },
1347 QueueKind::FromStorage,
1348 )
1349 .await
1350 }
1351
1352 pub(crate) async fn get_highest_complete_block_header_from_storage(self) -> Option<BlockHeader>
1354 where
1355 REv: From<StorageRequest>,
1356 {
1357 self.make_request(
1358 |responder| StorageRequest::GetHighestCompleteBlockHeader { responder },
1359 QueueKind::FromStorage,
1360 )
1361 .await
1362 }
1363
1364 pub(crate) async fn get_available_block_range_from_storage(self) -> AvailableBlockRange
1366 where
1367 REv: From<StorageRequest>,
1368 {
1369 self.make_request(
1370 |responder| StorageRequest::GetAvailableBlockRange { responder },
1371 QueueKind::FromStorage,
1372 )
1373 .await
1374 }
1375
1376 pub(crate) async fn sync_global_state(
1378 self,
1379 block_hash: BlockHash,
1380 state_root_hash: Digest,
1381 ) -> Result<GlobalStateSynchronizerResponse, GlobalStateSynchronizerError>
1382 where
1383 REv: From<SyncGlobalStateRequest>,
1384 {
1385 self.make_request(
1386 |responder| SyncGlobalStateRequest {
1387 block_hash,
1388 state_root_hash,
1389 responder,
1390 },
1391 QueueKind::SyncGlobalState,
1392 )
1393 .await
1394 }
1395
1396 pub(crate) async fn get_trie(self, request: TrieRequest) -> TrieResult
1398 where
1399 REv: From<ContractRuntimeRequest>,
1400 {
1401 self.make_request(
1402 |responder| ContractRuntimeRequest::GetTrie { request, responder },
1403 QueueKind::ContractRuntime,
1404 )
1405 .await
1406 }
1407
1408 pub(crate) async fn get_reactor_state(self) -> ReactorState
1409 where
1410 REv: From<ReactorInfoRequest>,
1411 {
1412 self.make_request(
1413 |responder| ReactorInfoRequest::ReactorState { responder },
1414 QueueKind::Regular,
1415 )
1416 .await
1417 }
1418
1419 pub(crate) async fn get_last_progress(self) -> LastProgress
1420 where
1421 REv: From<ReactorInfoRequest>,
1422 {
1423 self.make_request(
1424 |responder| ReactorInfoRequest::LastProgress { responder },
1425 QueueKind::Regular,
1426 )
1427 .await
1428 }
1429
1430 pub(crate) async fn get_uptime(self) -> Uptime
1431 where
1432 REv: From<ReactorInfoRequest>,
1433 {
1434 self.make_request(
1435 |responder| ReactorInfoRequest::Uptime { responder },
1436 QueueKind::Regular,
1437 )
1438 .await
1439 }
1440
1441 pub(crate) async fn get_network_name(self) -> NetworkName
1442 where
1443 REv: From<ReactorInfoRequest>,
1444 {
1445 self.make_request(
1446 |responder| ReactorInfoRequest::NetworkName { responder },
1447 QueueKind::Regular,
1448 )
1449 .await
1450 }
1451
1452 #[allow(unused)]
1453 pub(crate) async fn get_balance_holds_interval(self) -> TimeDiff
1454 where
1455 REv: From<ReactorInfoRequest>,
1456 {
1457 self.make_request(
1458 |responder| ReactorInfoRequest::BalanceHoldsInterval { responder },
1459 QueueKind::Regular,
1460 )
1461 .await
1462 }
1463
1464 pub(crate) async fn get_block_synchronizer_status(self) -> BlockSynchronizerStatus
1465 where
1466 REv: From<BlockSynchronizerRequest>,
1467 {
1468 self.make_request(
1469 |responder| BlockSynchronizerRequest::Status { responder },
1470 QueueKind::Regular,
1471 )
1472 .await
1473 }
1474
1475 pub(crate) async fn put_trie_if_all_children_present(
1479 self,
1480 request: PutTrieRequest,
1481 ) -> PutTrieResult
1482 where
1483 REv: From<ContractRuntimeRequest>,
1484 {
1485 self.make_request(
1486 |responder| ContractRuntimeRequest::PutTrie { request, responder },
1487 QueueKind::ContractRuntime,
1488 )
1489 .await
1490 }
1491
1492 pub(crate) async fn get_current_gas_price(self, era_id: EraId) -> Option<u8>
1493 where
1494 REv: From<ContractRuntimeRequest>,
1495 {
1496 self.make_request(
1497 |responder| ContractRuntimeRequest::GetEraGasPrice { era_id, responder },
1498 QueueKind::ContractRuntime,
1499 )
1500 .await
1501 }
1502
1503 pub(crate) async fn put_transaction_to_storage(self, transaction: Transaction) -> bool
1504 where
1505 REv: From<StorageRequest>,
1506 {
1507 self.make_request(
1508 |responder| StorageRequest::PutTransaction {
1509 transaction: Arc::new(transaction),
1510 responder,
1511 },
1512 QueueKind::ToStorage,
1513 )
1514 .await
1515 }
1516
1517 pub(crate) async fn get_transactions_from_storage(
1522 self,
1523 transaction_hashes: Vec<TransactionHash>,
1524 ) -> SmallVec<[Option<(Transaction, Option<BTreeSet<Approval>>)>; 1]>
1525 where
1526 REv: From<StorageRequest>,
1527 {
1528 self.make_request(
1529 |responder| StorageRequest::GetTransactions {
1530 transaction_hashes,
1531 responder,
1532 },
1533 QueueKind::FromStorage,
1534 )
1535 .await
1536 }
1537
1538 pub(crate) async fn get_transaction_and_exec_info_from_storage(
1540 self,
1541 transaction_hash: TransactionHash,
1542 with_finalized_approvals: bool,
1543 ) -> Option<(Transaction, Option<ExecutionInfo>)>
1544 where
1545 REv: From<StorageRequest>,
1546 {
1547 self.make_request(
1548 |responder| StorageRequest::GetTransactionAndExecutionInfo {
1549 transaction_hash,
1550 with_finalized_approvals,
1551 responder,
1552 },
1553 QueueKind::FromStorage,
1554 )
1555 .await
1556 }
1557
1558 pub(crate) async fn get_stored_legacy_deploy(
1563 self,
1564 deploy_hash: DeployHash,
1565 ) -> Option<LegacyDeploy>
1566 where
1567 REv: From<StorageRequest>,
1568 {
1569 self.make_request(
1570 |responder| StorageRequest::GetLegacyDeploy {
1571 deploy_hash,
1572 responder,
1573 },
1574 QueueKind::FromStorage,
1575 )
1576 .await
1577 }
1578
1579 pub(crate) async fn get_stored_transaction(
1584 self,
1585 transaction_id: TransactionId,
1586 ) -> Option<Transaction>
1587 where
1588 REv: From<StorageRequest>,
1589 {
1590 self.make_request(
1591 |responder| StorageRequest::GetTransaction {
1592 transaction_id,
1593 responder,
1594 },
1595 QueueKind::FromStorage,
1596 )
1597 .await
1598 }
1599
1600 pub(crate) async fn is_transaction_stored(self, transaction_id: TransactionId) -> bool
1601 where
1602 REv: From<StorageRequest>,
1603 {
1604 self.make_request(
1605 |responder| StorageRequest::IsTransactionStored {
1606 transaction_id,
1607 responder,
1608 },
1609 QueueKind::FromStorage,
1610 )
1611 .await
1612 }
1613
1614 pub(crate) async fn put_execution_artifacts_to_storage(
1617 self,
1618 block_hash: BlockHash,
1619 block_height: u64,
1620 era_id: EraId,
1621 execution_results: HashMap<TransactionHash, ExecutionResult>,
1622 ) where
1623 REv: From<StorageRequest>,
1624 {
1625 self.make_request(
1626 |responder| StorageRequest::PutExecutionResults {
1627 block_hash: Box::new(block_hash),
1628 block_height,
1629 era_id,
1630 execution_results,
1631 responder,
1632 },
1633 QueueKind::ToStorage,
1634 )
1635 .await;
1636 }
1637
1638 pub(crate) async fn get_block_at_height_with_metadata_from_storage(
1640 self,
1641 block_height: u64,
1642 only_from_available_block_range: bool,
1643 ) -> Option<BlockWithMetadata>
1644 where
1645 REv: From<StorageRequest>,
1646 {
1647 self.make_request(
1648 |responder| StorageRequest::GetBlockAndMetadataByHeight {
1649 block_height,
1650 only_from_available_block_range,
1651 responder,
1652 },
1653 QueueKind::FromStorage,
1654 )
1655 .await
1656 }
1657
1658 pub(crate) async fn collect_past_blocks_with_metadata(
1659 self,
1660 range: std::ops::Range<u64>,
1661 only_from_available_block_range: bool,
1662 ) -> Vec<Option<BlockWithMetadata>>
1663 where
1664 REv: From<StorageRequest>,
1665 {
1666 futures::future::join_all(range.into_iter().map(|block_height| {
1667 self.get_block_at_height_with_metadata_from_storage(
1668 block_height,
1669 only_from_available_block_range,
1670 )
1671 }))
1672 .await
1673 .into_iter()
1674 .collect()
1675 }
1676
1677 pub(crate) async fn get_finality_signature_from_storage(
1679 self,
1680 id: Box<FinalitySignatureId>,
1681 ) -> Option<FinalitySignature>
1682 where
1683 REv: From<StorageRequest>,
1684 {
1685 self.make_request(
1686 |responder| StorageRequest::GetFinalitySignature { id, responder },
1687 QueueKind::FromStorage,
1688 )
1689 .await
1690 }
1691
1692 pub(crate) async fn is_finality_signature_stored(self, id: Box<FinalitySignatureId>) -> bool
1693 where
1694 REv: From<StorageRequest>,
1695 {
1696 self.make_request(
1697 |responder| StorageRequest::IsFinalitySignatureStored { id, responder },
1698 QueueKind::FromStorage,
1699 )
1700 .await
1701 }
1702
1703 pub(crate) async fn fetch<T>(
1705 self,
1706 id: T::Id,
1707 peer: NodeId,
1708 validation_metadata: Box<T::ValidationMetadata>,
1709 ) -> FetchResult<T>
1710 where
1711 REv: From<FetcherRequest<T>>,
1712 T: FetchItem + 'static,
1713 {
1714 self.make_request(
1715 |responder| FetcherRequest {
1716 id,
1717 peer,
1718 validation_metadata,
1719 responder,
1720 },
1721 QueueKind::Fetch,
1722 )
1723 .await
1724 }
1725
1726 pub(crate) async fn fetch_trie(
1727 self,
1728 hash: Digest,
1729 peers: Vec<NodeId>,
1730 ) -> Result<TrieAccumulatorResponse, TrieAccumulatorError>
1731 where
1732 REv: From<TrieAccumulatorRequest>,
1733 {
1734 self.make_request(
1735 |responder| TrieAccumulatorRequest {
1736 hash,
1737 peers,
1738 responder,
1739 },
1740 QueueKind::SyncGlobalState,
1741 )
1742 .await
1743 }
1744
1745 pub(crate) async fn request_appendable_block(
1747 self,
1748 timestamp: Timestamp,
1749 era_id: EraId,
1750 request_expiry: Timestamp,
1751 ) -> AppendableBlock
1752 where
1753 REv: From<TransactionBufferRequest>,
1754 {
1755 self.make_request(
1756 |responder| TransactionBufferRequest::GetAppendableBlock {
1757 timestamp,
1758 era_id,
1759 request_expiry,
1760 responder,
1761 },
1762 QueueKind::Consensus,
1763 )
1764 .await
1765 }
1766
1767 pub(crate) async fn enqueue_block_for_execution(
1769 self,
1770 executable_block: ExecutableBlock,
1771 meta_block_state: MetaBlockState,
1772 ) where
1773 REv: From<StorageRequest> + From<ContractRuntimeRequest>,
1774 {
1775 let key_block_height_for_activation_point = self
1778 .make_request(
1779 |responder| StorageRequest::GetKeyBlockHeightForActivationPoint { responder },
1780 QueueKind::FromStorage,
1781 )
1782 .await
1783 .unwrap_or_else(|| {
1784 warn!("key block height for current activation point unknown");
1785 0
1786 });
1787
1788 self.event_queue
1789 .schedule(
1790 ContractRuntimeRequest::EnqueueBlockForExecution {
1791 executable_block,
1792 key_block_height_for_activation_point,
1793 meta_block_state,
1794 },
1795 QueueKind::ContractRuntime,
1796 )
1797 .await;
1798 }
1799
1800 pub(crate) async fn enqueue_protocol_upgrade(
1801 self,
1802 upgrade_config: ProtocolUpgradeConfig,
1803 next_block_height: u64,
1804 parent_hash: BlockHash,
1805 parent_seed: Digest,
1806 ) where
1807 REv: From<ContractRuntimeRequest>,
1808 {
1809 self.event_queue
1810 .schedule(
1811 ContractRuntimeRequest::DoProtocolUpgrade {
1812 protocol_upgrade_config: upgrade_config,
1813 next_block_height,
1814 parent_hash,
1815 parent_seed,
1816 },
1817 QueueKind::Control,
1818 )
1819 .await;
1820 }
1821
1822 pub(crate) async fn validate_block(
1825 self,
1826 sender: NodeId,
1827 proposed_block_height: u64,
1828 block: ProposedBlock<ClContext>,
1829 ) -> Result<(), Box<InvalidProposalError>>
1830 where
1831 REv: From<BlockValidationRequest>,
1832 {
1833 self.make_request(
1834 |responder| BlockValidationRequest {
1835 proposed_block_height,
1836 block,
1837 sender,
1838 responder,
1839 },
1840 QueueKind::Regular,
1841 )
1842 .await
1843 }
1844
1845 pub(crate) async fn announce_proposed_block(self, proposed_block: ProposedBlock<ClContext>)
1847 where
1848 REv: From<ConsensusAnnouncement>,
1849 {
1850 self.event_queue
1851 .schedule(
1852 ConsensusAnnouncement::Proposed(Box::new(proposed_block)),
1853 QueueKind::Consensus,
1854 )
1855 .await;
1856 }
1857
1858 pub(crate) async fn announce_finalized_block(self, finalized_block: FinalizedBlock)
1860 where
1861 REv: From<ConsensusAnnouncement>,
1862 {
1863 self.event_queue
1864 .schedule(
1865 ConsensusAnnouncement::Finalized(Box::new(finalized_block)),
1866 QueueKind::Consensus,
1867 )
1868 .await;
1869 }
1870
1871 pub(crate) async fn announce_meta_block(self, meta_block: MetaBlock)
1873 where
1874 REv: From<MetaBlockAnnouncement>,
1875 {
1876 self.event_queue
1877 .schedule(MetaBlockAnnouncement(meta_block), QueueKind::Regular)
1878 .await;
1879 }
1880
1881 pub(crate) async fn announce_unexecuted_block(self, block_height: u64)
1884 where
1885 REv: From<UnexecutedBlockAnnouncement>,
1886 {
1887 self.event_queue
1888 .schedule(
1889 UnexecutedBlockAnnouncement(block_height),
1890 QueueKind::Regular,
1891 )
1892 .await;
1893 }
1894
1895 pub(crate) async fn announce_fault_event(
1897 self,
1898 era_id: EraId,
1899 public_key: PublicKey,
1900 timestamp: Timestamp,
1901 ) where
1902 REv: From<ConsensusAnnouncement>,
1903 {
1904 self.event_queue
1905 .schedule(
1906 ConsensusAnnouncement::Fault {
1907 era_id,
1908 public_key: Box::new(public_key),
1909 timestamp,
1910 },
1911 QueueKind::Consensus,
1912 )
1913 .await;
1914 }
1915
1916 pub(crate) async fn announce_block_peer_with_justification(
1920 self,
1921 offender: NodeId,
1922 justification: BlocklistJustification,
1923 ) where
1924 REv: From<PeerBehaviorAnnouncement>,
1925 {
1926 warn!(%offender, %justification, "banning peer");
1927 self.event_queue
1928 .schedule(
1929 PeerBehaviorAnnouncement::OffenseCommitted {
1930 offender: Box::new(offender),
1931 justification: Box::new(justification),
1932 },
1933 QueueKind::NetworkInfo,
1934 )
1935 .await;
1936 }
1937
1938 pub(crate) async fn get_next_upgrade(self) -> Option<NextUpgrade>
1940 where
1941 REv: From<UpgradeWatcherRequest> + Send,
1942 {
1943 self.make_request(UpgradeWatcherRequest, QueueKind::Control)
1944 .await
1945 }
1946
1947 pub(crate) async fn query_global_state(self, request: QueryRequest) -> QueryResult
1949 where
1950 REv: From<ContractRuntimeRequest>,
1951 {
1952 self.make_request(
1953 |responder| ContractRuntimeRequest::Query { request, responder },
1954 QueueKind::ContractRuntime,
1955 )
1956 .await
1957 }
1958
1959 pub(crate) async fn get_addressable_entity(
1962 self,
1963 state_root_hash: Digest,
1964 entity_addr: EntityAddr,
1965 ) -> AddressableEntityResult
1966 where
1967 REv: From<ContractRuntimeRequest>,
1968 {
1969 self.make_request(
1970 |responder| ContractRuntimeRequest::GetAddressableEntity {
1971 state_root_hash,
1972 entity_addr,
1973 responder,
1974 },
1975 QueueKind::ContractRuntime,
1976 )
1977 .await
1978 }
1979
1980 pub(crate) async fn does_entry_point_exist(
1982 self,
1983 state_root_hash: Digest,
1984 contract_hash: HashAddr,
1985 entry_point_name: String,
1986 ) -> EntryPointExistsResult
1987 where
1988 REv: From<ContractRuntimeRequest>,
1989 {
1990 self.make_request(
1991 |responder| ContractRuntimeRequest::GetEntryPointExists {
1992 state_root_hash,
1993 contract_hash,
1994 entry_point_name,
1995 responder,
1996 },
1997 QueueKind::ContractRuntime,
1998 )
1999 .await
2000 }
2001
2002 pub(crate) async fn get_package(
2004 self,
2005 state_root_hash: Digest,
2006 package_addr: PackageAddr,
2007 ) -> Option<Box<Package>>
2008 where
2009 REv: From<ContractRuntimeRequest>,
2010 {
2011 let key = Key::Hash(package_addr);
2012 let query_request = QueryRequest::new(state_root_hash, key, vec![]);
2013
2014 match self.query_global_state(query_request).await {
2015 QueryResult::RootNotFound | QueryResult::Failure(_) => None,
2016 QueryResult::ValueNotFound(_) => {
2017 let query_request =
2018 QueryRequest::new(state_root_hash, Key::SmartContract(package_addr), vec![]);
2019 debug!("requesting under different key");
2020 if let QueryResult::Success { value, .. } =
2021 self.query_global_state(query_request).await
2022 {
2023 value.into_package().map(Box::new)
2024 } else {
2025 None
2026 }
2027 }
2028 QueryResult::Success { value, .. } => value
2029 .into_contract_package()
2030 .map(Package::from)
2031 .map(Box::new),
2032 }
2033 }
2034
2035 pub(crate) async fn get_balance(self, request: BalanceRequest) -> BalanceResult
2037 where
2038 REv: From<ContractRuntimeRequest>,
2039 {
2040 self.make_request(
2041 |responder| ContractRuntimeRequest::GetBalance { request, responder },
2042 QueueKind::ContractRuntime,
2043 )
2044 .await
2045 }
2046
2047 pub(crate) async fn get_era_validators_from_contract_runtime(
2051 self,
2052 request: EraValidatorsRequest,
2053 ) -> EraValidatorsResult
2054 where
2055 REv: From<ContractRuntimeRequest>,
2056 {
2057 self.make_request(
2058 |responder| ContractRuntimeRequest::GetEraValidators { request, responder },
2059 QueueKind::ContractRuntime,
2060 )
2061 .await
2062 }
2063
2064 pub(crate) async fn get_seigniorage_recipients_snapshot_from_contract_runtime(
2065 self,
2066 request: SeigniorageRecipientsRequest,
2067 ) -> SeigniorageRecipientsResult
2068 where
2069 REv: From<ContractRuntimeRequest>,
2070 {
2071 self.make_request(
2072 |responder| ContractRuntimeRequest::GetSeigniorageRecipients { request, responder },
2073 QueueKind::ContractRuntime,
2074 )
2075 .await
2076 }
2077
2078 pub(crate) async fn get_tagged_values(self, request: TaggedValuesRequest) -> TaggedValuesResult
2080 where
2081 REv: From<ContractRuntimeRequest>,
2082 {
2083 self.make_request(
2084 |responder| ContractRuntimeRequest::GetTaggedValues { request, responder },
2085 QueueKind::ContractRuntime,
2086 )
2087 .await
2088 }
2089
2090 pub(crate) async fn get_prefixed_values(
2091 self,
2092 request: PrefixedValuesRequest,
2093 ) -> PrefixedValuesResult
2094 where
2095 REv: From<ContractRuntimeRequest>,
2096 {
2097 self.make_request(
2098 |responder| ContractRuntimeRequest::QueryByPrefix { request, responder },
2099 QueueKind::ContractRuntime,
2100 )
2101 .await
2102 }
2103
2104 pub(crate) async fn get_execution_results_checksum(
2107 self,
2108 state_root_hash: Digest,
2109 ) -> ExecutionResultsChecksumResult
2110 where
2111 REv: From<ContractRuntimeRequest>,
2112 {
2113 self.make_request(
2114 |responder| ContractRuntimeRequest::GetExecutionResultsChecksum {
2115 state_root_hash,
2116 responder,
2117 },
2118 QueueKind::ContractRuntime,
2119 )
2120 .await
2121 }
2122
2123 pub(crate) async fn consensus_status(self) -> Option<ConsensusStatus>
2125 where
2126 REv: From<ConsensusRequest>,
2127 {
2128 self.make_request(ConsensusRequest::Status, QueueKind::Consensus)
2129 .await
2130 }
2131
2132 pub(crate) async fn get_consensus_validator_changes(self) -> ConsensusValidatorChanges
2134 where
2135 REv: From<ConsensusRequest>,
2136 {
2137 self.make_request(ConsensusRequest::ValidatorChanges, QueueKind::Consensus)
2138 .await
2139 }
2140
2141 pub(crate) async fn diagnostics_port_dump_consensus_state(
2144 self,
2145 era_id: Option<EraId>,
2146 serialize: fn(&EraDump<'_>) -> Result<Vec<u8>, Cow<'static, str>>,
2147 ) -> Result<Vec<u8>, Cow<'static, str>>
2148 where
2149 REv: From<DumpConsensusStateRequest>,
2150 {
2151 self.make_request(
2152 |responder| DumpConsensusStateRequest {
2153 era_id,
2154 serialize,
2155 responder,
2156 },
2157 QueueKind::Control,
2158 )
2159 .await
2160 }
2161
2162 pub(crate) async fn diagnostics_port_dump_queue(self, dump_format: QueueDumpFormat)
2164 where
2165 REv: From<ControlAnnouncement>,
2166 {
2167 self.make_request(
2168 |responder| ControlAnnouncement::QueueDumpRequest {
2169 dump_format,
2170 finished: responder,
2171 },
2172 QueueKind::Control,
2173 )
2174 .await;
2175 }
2176
2177 pub(crate) async fn activate_failpoint(self, activation: FailpointActivation)
2179 where
2180 REv: From<ControlAnnouncement>,
2181 {
2182 self.event_queue
2183 .schedule(
2184 ControlAnnouncement::ActivateFailpoint { activation },
2185 QueueKind::Control,
2186 )
2187 .await;
2188 }
2189
2190 pub(crate) async fn announce_user_shutdown_request(self)
2192 where
2193 REv: From<ControlAnnouncement>,
2194 {
2195 self.event_queue
2196 .schedule(
2197 ControlAnnouncement::ShutdownDueToUserRequest,
2198 QueueKind::Control,
2199 )
2200 .await;
2201 }
2202
2203 pub(crate) async fn announce_fetched_new_block(self, block: Arc<Block>, peer: NodeId)
2206 where
2207 REv: From<FetchedNewBlockAnnouncement>,
2208 {
2209 self.event_queue
2210 .schedule(
2211 FetchedNewBlockAnnouncement { block, peer },
2212 QueueKind::Fetch,
2213 )
2214 .await;
2215 }
2216
2217 pub(crate) async fn announce_fetched_new_finality_signature(
2220 self,
2221 finality_signature: Box<FinalitySignature>,
2222 peer: NodeId,
2223 ) where
2224 REv: From<FetchedNewFinalitySignatureAnnouncement>,
2225 {
2226 self.event_queue
2227 .schedule(
2228 FetchedNewFinalitySignatureAnnouncement {
2229 finality_signature,
2230 peer,
2231 },
2232 QueueKind::Fetch,
2233 )
2234 .await;
2235 }
2236
2237 pub(crate) async fn get_chainspec_raw_bytes(self) -> Arc<ChainspecRawBytes>
2240 where
2241 REv: From<ChainspecRawBytesRequest> + Send,
2242 {
2243 self.make_request(
2244 ChainspecRawBytesRequest::GetChainspecRawBytes,
2245 QueueKind::NetworkInfo,
2246 )
2247 .await
2248 }
2249
2250 pub(crate) async fn store_finalized_approvals(
2254 self,
2255 transaction_hash: TransactionHash,
2256 finalized_approvals: BTreeSet<Approval>,
2257 ) -> bool
2258 where
2259 REv: From<StorageRequest>,
2260 {
2261 self.make_request(
2262 |responder| StorageRequest::StoreFinalizedApprovals {
2263 transaction_hash,
2264 finalized_approvals,
2265 responder,
2266 },
2267 QueueKind::ToStorage,
2268 )
2269 .await
2270 }
2271
2272 pub(crate) async fn speculatively_execute(
2275 self,
2276 block_header: Box<BlockHeader>,
2277 transaction: Box<Transaction>,
2278 ) -> SpeculativeExecutionResult
2279 where
2280 REv: From<ContractRuntimeRequest>,
2281 {
2282 self.make_request(
2283 |responder| ContractRuntimeRequest::SpeculativelyExecute {
2284 block_header,
2285 transaction,
2286 responder,
2287 },
2288 QueueKind::ContractRuntime,
2289 )
2290 .await
2291 }
2292
2293 pub(crate) async fn get_block_execution_results_or_chunk_from_storage(
2295 self,
2296 id: BlockExecutionResultsOrChunkId,
2297 ) -> Option<BlockExecutionResultsOrChunk>
2298 where
2299 REv: From<StorageRequest>,
2300 {
2301 self.make_request(
2302 |responder| StorageRequest::GetBlockExecutionResultsOrChunk { id, responder },
2303 QueueKind::FromStorage,
2304 )
2305 .await
2306 }
2307
2308 pub(crate) async fn get_block_accumulated_peers(
2310 self,
2311 block_hash: BlockHash,
2312 ) -> Option<Vec<NodeId>>
2313 where
2314 REv: From<BlockAccumulatorRequest>,
2315 {
2316 self.make_request(
2317 |responder| BlockAccumulatorRequest::GetPeersForBlock {
2318 block_hash,
2319 responder,
2320 },
2321 QueueKind::NetworkInfo,
2322 )
2323 .await
2324 }
2325
2326 pub(crate) async fn set_node_stop_at(self, stop_at: Option<StopAtSpec>) -> Option<StopAtSpec>
2330 where
2331 REv: From<SetNodeStopRequest>,
2332 {
2333 self.make_request(
2334 |responder| SetNodeStopRequest { stop_at, responder },
2335 QueueKind::Control,
2336 )
2337 .await
2338 }
2339}
2340
2341#[macro_export]
2346macro_rules! fatal {
2347 ($effect_builder:expr, $($arg:tt)*) => {
2348 $effect_builder.fatal(file!(), line!(), format!($($arg)*))
2349 };
2350}