polkadot_node_subsystem_util/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Utility module for subsystems
18//!
19//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
20//! or determining what their validator ID is. These common interests are factored into
21//! this module.
22//!
23//! This crate also reexports Prometheus metric types which are expected to be implemented by
24//! subsystems.
25
26#![warn(missing_docs)]
27
28pub use overseer::{
29	gen::{OrchestraError as OverseerError, Timeout},
30	Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33	errors::{RuntimeApiError, SubsystemError},
34	messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35	overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44	async_backing::{BackingState, Constraints},
45	slashing, AsyncBackingParams, AuthorityDiscoveryId, CandidateEvent, CandidateHash,
46	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs,
47	ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures,
48	OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex,
49	SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId,
50	ValidatorIndex, ValidatorSignature,
51};
52pub use rand;
53use sp_application_crypto::AppCrypto;
54use sp_core::ByteArray;
55use sp_keystore::{Error as KeystoreError, KeystorePtr};
56use std::{
57	collections::{BTreeMap, VecDeque},
58	time::Duration,
59};
60use thiserror::Error;
61
62pub use determine_new_blocks::determine_new_blocks;
63pub use metered;
64pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
65
66/// These reexports are required so that external crates can use the `delegated_subsystem` macro
67/// properly.
68pub mod reexports {
69	pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
70}
71
72/// Helpers for the validator->chunk index mapping.
73pub mod availability_chunks;
74/// A utility for managing the implicit view of the relay-chain derived from active
75/// leaves and the minimum allowed relay-parents that parachain candidates can have
76/// and be backed in those leaves' children.
77pub mod backing_implicit_view;
78/// Database trait for subsystem.
79pub mod database;
80/// An emulator for node-side code to predict the results of on-chain parachain inclusion
81/// and predict future constraints.
82pub mod inclusion_emulator;
83/// Convenient and efficient runtime info access.
84pub mod runtime;
85
86/// Helpers for working with unreleased runtime calls
87pub mod vstaging;
88
89/// Nested message sending
90///
91/// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous
92/// tasks, sending messages back.
93pub mod nesting_sender;
94
95pub mod reputation;
96
97mod determine_new_blocks;
98
99mod controlled_validator_indices;
100pub use controlled_validator_indices::ControlledValidatorIndices;
101
102#[cfg(test)]
103mod tests;
104
105const LOG_TARGET: &'static str = "parachain::subsystem-util";
106
107/// Duration a job will wait after sending a stop signal before hard-aborting.
108pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
109/// Capacity of channels to and from individual jobs
110pub const JOB_CHANNEL_CAPACITY: usize = 64;
111
112/// Utility errors
113#[derive(Debug, Error)]
114pub enum Error {
115	/// Attempted to send or receive on a oneshot channel which had been canceled
116	#[error(transparent)]
117	Oneshot(#[from] oneshot::Canceled),
118	/// Attempted to send on a MPSC channel which has been canceled
119	#[error(transparent)]
120	Mpsc(#[from] mpsc::SendError),
121	/// A subsystem error
122	#[error(transparent)]
123	Subsystem(#[from] SubsystemError),
124	/// An error in the Runtime API.
125	#[error(transparent)]
126	RuntimeApi(#[from] RuntimeApiError),
127	/// The type system wants this even though it doesn't make sense
128	#[error(transparent)]
129	Infallible(#[from] std::convert::Infallible),
130	/// Attempted to convert from an `AllMessages` to a `FromJob`, and failed.
131	#[error("AllMessage not relevant to Job")]
132	SenderConversion(String),
133	/// The local node is not a validator.
134	#[error("Node is not a validator")]
135	NotAValidator,
136	/// Already forwarding errors to another sender
137	#[error("AlreadyForwarding")]
138	AlreadyForwarding,
139	/// Data that are supposed to be there a not there
140	#[error("Data are not available")]
141	DataNotAvailable,
142}
143
144impl From<OverseerError> for Error {
145	fn from(e: OverseerError) -> Self {
146		Self::from(SubsystemError::from(e))
147	}
148}
149
150impl TryFrom<crate::runtime::Error> for Error {
151	type Error = ();
152
153	fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
154		use crate::runtime::Error;
155
156		match e {
157			Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
158			Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
159			Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
160		}
161	}
162}
163
164/// A type alias for Runtime API receivers.
165pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
166
167/// Request some data from the `RuntimeApi`.
168pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
169	parent: Hash,
170	sender: &mut Sender,
171	request_builder: RequestBuilder,
172) -> RuntimeApiReceiver<Response>
173where
174	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
175	Sender: SubsystemSender<RuntimeApiMessage>,
176{
177	let (tx, rx) = oneshot::channel();
178
179	sender
180		.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
181		.await;
182
183	rx
184}
185
186/// Verifies if `ParachainHost` runtime api is at least at version `required_runtime_version`. This
187/// method is used to determine if a given runtime call is supported by the runtime.
188pub async fn has_required_runtime<Sender>(
189	sender: &mut Sender,
190	relay_parent: Hash,
191	required_runtime_version: u32,
192) -> bool
193where
194	Sender: SubsystemSender<RuntimeApiMessage>,
195{
196	gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
197
198	let (tx, rx) = oneshot::channel();
199	sender
200		.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
201		.await;
202
203	match rx.await {
204		Result::Ok(Ok(runtime_version)) => {
205			gum::trace!(
206				target: LOG_TARGET,
207				?relay_parent,
208				?runtime_version,
209				?required_runtime_version,
210				"Fetched  ParachainHost runtime api version"
211			);
212			runtime_version >= required_runtime_version
213		},
214		Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
215			gum::trace!(
216				target: LOG_TARGET,
217				?relay_parent,
218				?error,
219				"Execution error while fetching ParachainHost runtime api version"
220			);
221			false
222		},
223		Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
224			gum::trace!(
225				target: LOG_TARGET,
226				?relay_parent,
227				"NotSupported error while fetching ParachainHost runtime api version"
228			);
229			false
230		},
231		Result::Err(_) => {
232			gum::trace!(
233				target: LOG_TARGET,
234				?relay_parent,
235				"Cancelled error while fetching ParachainHost runtime api version"
236			);
237			false
238		},
239	}
240}
241
242/// Construct specialized request functions for the runtime.
243///
244/// These would otherwise get pretty repetitive.
245macro_rules! specialize_requests {
246	// expand return type name for documentation purposes
247	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
248		specialize_requests!{
249			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
250		}
251	};
252
253	// create a single specialized request function
254	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
255		#[doc = "Request `"]
256		#[doc = $doc_name]
257		#[doc = "` from the runtime"]
258		pub async fn $func_name (
259			parent: Hash,
260			$(
261				$param_name: $param_ty,
262			)*
263			sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
264		) -> RuntimeApiReceiver<$return_ty>
265		{
266			request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
267				$( $param_name, )* tx
268			)).await
269		}
270	};
271
272	// recursive decompose
273	(
274		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
275		$(
276			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
277		)+
278	) => {
279		specialize_requests!{
280			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
281		}
282		specialize_requests!{
283			$(
284				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
285			)+
286		}
287	};
288}
289
290specialize_requests! {
291	fn request_runtime_api_version() -> u32; Version;
292	fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
293	fn request_validators() -> Vec<ValidatorId>; Validators;
294	fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
295	fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
296	fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
297	fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
298	fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
299	fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
300	fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
301	fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
302	fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
303	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
304	fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
305	fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
306		-> Option<ValidationCodeHash>; ValidationCodeHash;
307	fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
308	fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
309	fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::LegacyPendingSlashes)>; UnappliedSlashes;
310	fn request_unapplied_slashes_v2() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashesV2;
311	fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
312	fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
313	fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
314	fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
315	fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
316	fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
317	fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
318	fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
319	fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
320	fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
321
322}
323
324/// Requests executor parameters from the runtime effective at given relay-parent. First obtains
325/// session index at the relay-parent, relying on the fact that it should be cached by the runtime
326/// API caching layer even if the block itself has already been pruned. Then requests executor
327/// parameters by session index.
328/// Returns an error if failed to communicate to the runtime, or the parameters are not in the
329/// storage, which should never happen.
330/// Returns default execution parameters if the runtime doesn't yet support `SessionExecutorParams`
331/// API call.
332/// Otherwise, returns execution parameters returned by the runtime.
333pub async fn executor_params_at_relay_parent(
334	relay_parent: Hash,
335	sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
336) -> Result<ExecutorParams, Error> {
337	match request_session_index_for_child(relay_parent, sender).await.await {
338		Err(err) => {
339			// Failed to communicate with the runtime
340			Err(Error::Oneshot(err))
341		},
342		Ok(Err(err)) => {
343			// Runtime has failed to obtain a session index at the relay-parent.
344			Err(Error::RuntimeApi(err))
345		},
346		Ok(Ok(session_index)) => {
347			match request_session_executor_params(relay_parent, session_index, sender).await.await {
348				Err(err) => {
349					// Failed to communicate with the runtime
350					Err(Error::Oneshot(err))
351				},
352				Ok(Err(RuntimeApiError::NotSupported { .. })) => {
353					// Runtime doesn't yet support the api requested, should execute anyway
354					// with default set of parameters
355					Ok(ExecutorParams::default())
356				},
357				Ok(Err(err)) => {
358					// Runtime failed to execute the request
359					Err(Error::RuntimeApi(err))
360				},
361				Ok(Ok(None)) => {
362					// Storage doesn't contain a parameter set for the given session; should
363					// never happen
364					Err(Error::DataNotAvailable)
365				},
366				Ok(Ok(Some(executor_params))) => Ok(executor_params),
367			}
368		},
369	}
370}
371
372/// From the given set of validators, find the first key we can sign with, if any.
373pub fn signing_key<'a>(
374	validators: impl IntoIterator<Item = &'a ValidatorId>,
375	keystore: &KeystorePtr,
376) -> Option<ValidatorId> {
377	signing_key_and_index(validators, keystore).map(|(k, _)| k)
378}
379
380/// From the given set of validators, find the first key we can sign with, if any, and return it
381/// along with the validator index.
382pub fn signing_key_and_index<'a>(
383	validators: impl IntoIterator<Item = &'a ValidatorId>,
384	keystore: &KeystorePtr,
385) -> Option<(ValidatorId, ValidatorIndex)> {
386	for (i, v) in validators.into_iter().enumerate() {
387		if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
388			return Some((v.clone(), ValidatorIndex(i as _)))
389		}
390	}
391	None
392}
393
394/// Sign the given data with the given validator ID.
395///
396/// Returns `Ok(None)` if the private key that corresponds to that validator ID is not found in the
397/// given keystore. Returns an error if the key could not be used for signing.
398pub fn sign(
399	keystore: &KeystorePtr,
400	key: &ValidatorId,
401	data: &[u8],
402) -> Result<Option<ValidatorSignature>, KeystoreError> {
403	let signature = keystore
404		.sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
405		.map(|sig| sig.into());
406	Ok(signature)
407}
408
409/// Find the validator group the given validator index belongs to.
410pub fn find_validator_group(
411	groups: &[Vec<ValidatorIndex>],
412	index: ValidatorIndex,
413) -> Option<GroupIndex> {
414	groups.iter().enumerate().find_map(|(i, g)| {
415		if g.contains(&index) {
416			Some(GroupIndex(i as _))
417		} else {
418			None
419		}
420	})
421}
422
423/// Choose a random subset of `min` elements.
424/// But always include `is_priority` elements.
425pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
426	choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
427}
428
429/// Choose a random subset of `min` elements using a specific Random Generator `Rng`
430/// But always include `is_priority` elements.
431pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
432	is_priority: F,
433	v: &mut Vec<T>,
434	rng: &mut R,
435	min: usize,
436) {
437	use rand::seq::SliceRandom as _;
438
439	// partition the elements into priority first
440	// the returned index is when non_priority elements start
441	let i = itertools::partition(v.iter_mut(), is_priority);
442
443	if i >= min || v.len() <= i {
444		v.truncate(i);
445		return
446	}
447
448	v[i..].shuffle(rng);
449
450	v.truncate(min);
451}
452
453/// Returns a `bool` with a probability of `a / b` of being true.
454pub fn gen_ratio(a: usize, b: usize) -> bool {
455	gen_ratio_rng(a, b, &mut rand::thread_rng())
456}
457
458/// Returns a `bool` with a probability of `a / b` of being true.
459pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
460	rng.gen_ratio(a as u32, b as u32)
461}
462
463/// Local validator information
464///
465/// It can be created if the local node is a validator in the context of a particular
466/// relay chain block.
467#[derive(Debug)]
468pub struct Validator {
469	signing_context: SigningContext,
470	key: ValidatorId,
471	index: ValidatorIndex,
472	disabled: bool,
473}
474
475impl Validator {
476	/// Get a struct representing this node's validator if this node is in fact a validator in the
477	/// context of the given block.
478	pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
479	where
480		S: SubsystemSender<RuntimeApiMessage>,
481	{
482		// Note: request_validators, request_disabled_validators and request_session_index_for_child
483		// do not and cannot run concurrently: they both have a mutable handle to the same sender.
484		// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
485		let (validators, disabled_validators, session_index) = futures::try_join!(
486			request_validators(parent, sender).await,
487			request_disabled_validators(parent, sender).await,
488			request_session_index_for_child(parent, sender).await,
489		)?;
490
491		let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
492
493		let validators = validators?;
494
495		let disabled_validators = disabled_validators?;
496
497		Self::construct(&validators, &disabled_validators, signing_context, keystore)
498	}
499
500	/// Construct a validator instance without performing runtime fetches.
501	///
502	/// This can be useful if external code also needs the same data.
503	pub fn construct(
504		validators: &[ValidatorId],
505		disabled_validators: &[ValidatorIndex],
506		signing_context: SigningContext,
507		keystore: KeystorePtr,
508	) -> Result<Self, Error> {
509		let (key, index) =
510			signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
511
512		let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
513
514		Ok(Validator { signing_context, key, index, disabled })
515	}
516
517	/// Get this validator's id.
518	pub fn id(&self) -> ValidatorId {
519		self.key.clone()
520	}
521
522	/// Get this validator's local index.
523	pub fn index(&self) -> ValidatorIndex {
524		self.index
525	}
526
527	/// Get the enabled/disabled state of this validator
528	pub fn disabled(&self) -> bool {
529		self.disabled
530	}
531
532	/// Get the current signing context.
533	pub fn signing_context(&self) -> &SigningContext {
534		&self.signing_context
535	}
536
537	/// Sign a payload with this validator
538	pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
539		&self,
540		keystore: KeystorePtr,
541		payload: Payload,
542	) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
543		Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
544	}
545}