polkadot_node_subsystem_util/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Utility module for subsystems
18//!
19//! Many subsystems have common interests such as canceling a bunch of spawned jobs,
20//! or determining what their validator ID is. These common interests are factored into
21//! this module.
22//!
23//! This crate also reexports Prometheus metric types which are expected to be implemented by
24//! subsystems.
25
26#![warn(missing_docs)]
27
28pub use overseer::{
29	gen::{OrchestraError as OverseerError, Timeout},
30	Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33	errors::{RuntimeApiError, SubsystemError},
34	messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35	overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44	slashing,
45	vstaging::{
46		async_backing::{BackingState, Constraints},
47		CandidateEvent, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState,
48		ScrapedOnChainVotes,
49	},
50	AsyncBackingParams, AuthorityDiscoveryId, CandidateHash, CoreIndex, EncodeAs, ExecutorParams,
51	GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures, OccupiedCoreAssumption,
52	PersistedValidationData, SessionIndex, SessionInfo, Signed, SigningContext, ValidationCode,
53	ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
54};
55pub use rand;
56use sp_application_crypto::AppCrypto;
57use sp_core::ByteArray;
58use sp_keystore::{Error as KeystoreError, KeystorePtr};
59use std::{
60	collections::{BTreeMap, VecDeque},
61	time::Duration,
62};
63use thiserror::Error;
64
65pub use determine_new_blocks::determine_new_blocks;
66pub use metered;
67pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
68
69/// These reexports are required so that external crates can use the `delegated_subsystem` macro
70/// properly.
71pub mod reexports {
72	pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
73}
74
75/// Helpers for the validator->chunk index mapping.
76pub mod availability_chunks;
77/// A utility for managing the implicit view of the relay-chain derived from active
78/// leaves and the minimum allowed relay-parents that parachain candidates can have
79/// and be backed in those leaves' children.
80pub mod backing_implicit_view;
81/// Database trait for subsystem.
82pub mod database;
83/// An emulator for node-side code to predict the results of on-chain parachain inclusion
84/// and predict future constraints.
85pub mod inclusion_emulator;
86/// Convenient and efficient runtime info access.
87pub mod runtime;
88
89/// Helpers for working with unreleased runtime calls
90pub mod vstaging;
91
92/// Nested message sending
93///
94/// Useful for having mostly synchronous code, with submodules spawning short lived asynchronous
95/// tasks, sending messages back.
96pub mod nesting_sender;
97
98pub mod reputation;
99
100mod determine_new_blocks;
101
102mod controlled_validator_indices;
103pub use controlled_validator_indices::ControlledValidatorIndices;
104
105#[cfg(test)]
106mod tests;
107
108const LOG_TARGET: &'static str = "parachain::subsystem-util";
109
110/// Duration a job will wait after sending a stop signal before hard-aborting.
111pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
112/// Capacity of channels to and from individual jobs
113pub const JOB_CHANNEL_CAPACITY: usize = 64;
114
115/// Utility errors
116#[derive(Debug, Error)]
117pub enum Error {
118	/// Attempted to send or receive on a oneshot channel which had been canceled
119	#[error(transparent)]
120	Oneshot(#[from] oneshot::Canceled),
121	/// Attempted to send on a MPSC channel which has been canceled
122	#[error(transparent)]
123	Mpsc(#[from] mpsc::SendError),
124	/// A subsystem error
125	#[error(transparent)]
126	Subsystem(#[from] SubsystemError),
127	/// An error in the Runtime API.
128	#[error(transparent)]
129	RuntimeApi(#[from] RuntimeApiError),
130	/// The type system wants this even though it doesn't make sense
131	#[error(transparent)]
132	Infallible(#[from] std::convert::Infallible),
133	/// Attempted to convert from an `AllMessages` to a `FromJob`, and failed.
134	#[error("AllMessage not relevant to Job")]
135	SenderConversion(String),
136	/// The local node is not a validator.
137	#[error("Node is not a validator")]
138	NotAValidator,
139	/// Already forwarding errors to another sender
140	#[error("AlreadyForwarding")]
141	AlreadyForwarding,
142	/// Data that are supposed to be there a not there
143	#[error("Data are not available")]
144	DataNotAvailable,
145}
146
147impl From<OverseerError> for Error {
148	fn from(e: OverseerError) -> Self {
149		Self::from(SubsystemError::from(e))
150	}
151}
152
153impl TryFrom<crate::runtime::Error> for Error {
154	type Error = ();
155
156	fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
157		use crate::runtime::Error;
158
159		match e {
160			Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
161			Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
162			Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
163		}
164	}
165}
166
167/// A type alias for Runtime API receivers.
168pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
169
170/// Request some data from the `RuntimeApi`.
171pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
172	parent: Hash,
173	sender: &mut Sender,
174	request_builder: RequestBuilder,
175) -> RuntimeApiReceiver<Response>
176where
177	RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
178	Sender: SubsystemSender<RuntimeApiMessage>,
179{
180	let (tx, rx) = oneshot::channel();
181
182	sender
183		.send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
184		.await;
185
186	rx
187}
188
189/// Verifies if `ParachainHost` runtime api is at least at version `required_runtime_version`. This
190/// method is used to determine if a given runtime call is supported by the runtime.
191pub async fn has_required_runtime<Sender>(
192	sender: &mut Sender,
193	relay_parent: Hash,
194	required_runtime_version: u32,
195) -> bool
196where
197	Sender: SubsystemSender<RuntimeApiMessage>,
198{
199	gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
200
201	let (tx, rx) = oneshot::channel();
202	sender
203		.send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
204		.await;
205
206	match rx.await {
207		Result::Ok(Ok(runtime_version)) => {
208			gum::trace!(
209				target: LOG_TARGET,
210				?relay_parent,
211				?runtime_version,
212				?required_runtime_version,
213				"Fetched  ParachainHost runtime api version"
214			);
215			runtime_version >= required_runtime_version
216		},
217		Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
218			gum::trace!(
219				target: LOG_TARGET,
220				?relay_parent,
221				?error,
222				"Execution error while fetching ParachainHost runtime api version"
223			);
224			false
225		},
226		Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
227			gum::trace!(
228				target: LOG_TARGET,
229				?relay_parent,
230				"NotSupported error while fetching ParachainHost runtime api version"
231			);
232			false
233		},
234		Result::Err(_) => {
235			gum::trace!(
236				target: LOG_TARGET,
237				?relay_parent,
238				"Cancelled error while fetching ParachainHost runtime api version"
239			);
240			false
241		},
242	}
243}
244
245/// Construct specialized request functions for the runtime.
246///
247/// These would otherwise get pretty repetitive.
248macro_rules! specialize_requests {
249	// expand return type name for documentation purposes
250	(fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
251		specialize_requests!{
252			named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
253		}
254	};
255
256	// create a single specialized request function
257	(named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
258		#[doc = "Request `"]
259		#[doc = $doc_name]
260		#[doc = "` from the runtime"]
261		pub async fn $func_name (
262			parent: Hash,
263			$(
264				$param_name: $param_ty,
265			)*
266			sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
267		) -> RuntimeApiReceiver<$return_ty>
268		{
269			request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
270				$( $param_name, )* tx
271			)).await
272		}
273	};
274
275	// recursive decompose
276	(
277		fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
278		$(
279			fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
280		)+
281	) => {
282		specialize_requests!{
283			fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
284		}
285		specialize_requests!{
286			$(
287				fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
288			)+
289		}
290	};
291}
292
293specialize_requests! {
294	fn request_runtime_api_version() -> u32; Version;
295	fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
296	fn request_validators() -> Vec<ValidatorId>; Validators;
297	fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
298	fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
299	fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
300	fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
301	fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
302	fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
303	fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
304	fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
305	fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
306	fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
307	fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
308	fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
309		-> Option<ValidationCodeHash>; ValidationCodeHash;
310	fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
311	fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
312	fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashes;
313	fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
314	fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
315	fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
316	fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
317	fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
318	fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
319	fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
320	fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
321	fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
322	fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
323
324}
325
326/// Requests executor parameters from the runtime effective at given relay-parent. First obtains
327/// session index at the relay-parent, relying on the fact that it should be cached by the runtime
328/// API caching layer even if the block itself has already been pruned. Then requests executor
329/// parameters by session index.
330/// Returns an error if failed to communicate to the runtime, or the parameters are not in the
331/// storage, which should never happen.
332/// Returns default execution parameters if the runtime doesn't yet support `SessionExecutorParams`
333/// API call.
334/// Otherwise, returns execution parameters returned by the runtime.
335pub async fn executor_params_at_relay_parent(
336	relay_parent: Hash,
337	sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
338) -> Result<ExecutorParams, Error> {
339	match request_session_index_for_child(relay_parent, sender).await.await {
340		Err(err) => {
341			// Failed to communicate with the runtime
342			Err(Error::Oneshot(err))
343		},
344		Ok(Err(err)) => {
345			// Runtime has failed to obtain a session index at the relay-parent.
346			Err(Error::RuntimeApi(err))
347		},
348		Ok(Ok(session_index)) => {
349			match request_session_executor_params(relay_parent, session_index, sender).await.await {
350				Err(err) => {
351					// Failed to communicate with the runtime
352					Err(Error::Oneshot(err))
353				},
354				Ok(Err(RuntimeApiError::NotSupported { .. })) => {
355					// Runtime doesn't yet support the api requested, should execute anyway
356					// with default set of parameters
357					Ok(ExecutorParams::default())
358				},
359				Ok(Err(err)) => {
360					// Runtime failed to execute the request
361					Err(Error::RuntimeApi(err))
362				},
363				Ok(Ok(None)) => {
364					// Storage doesn't contain a parameter set for the given session; should
365					// never happen
366					Err(Error::DataNotAvailable)
367				},
368				Ok(Ok(Some(executor_params))) => Ok(executor_params),
369			}
370		},
371	}
372}
373
374/// From the given set of validators, find the first key we can sign with, if any.
375pub fn signing_key<'a>(
376	validators: impl IntoIterator<Item = &'a ValidatorId>,
377	keystore: &KeystorePtr,
378) -> Option<ValidatorId> {
379	signing_key_and_index(validators, keystore).map(|(k, _)| k)
380}
381
382/// From the given set of validators, find the first key we can sign with, if any, and return it
383/// along with the validator index.
384pub fn signing_key_and_index<'a>(
385	validators: impl IntoIterator<Item = &'a ValidatorId>,
386	keystore: &KeystorePtr,
387) -> Option<(ValidatorId, ValidatorIndex)> {
388	for (i, v) in validators.into_iter().enumerate() {
389		if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
390			return Some((v.clone(), ValidatorIndex(i as _)))
391		}
392	}
393	None
394}
395
396/// Sign the given data with the given validator ID.
397///
398/// Returns `Ok(None)` if the private key that corresponds to that validator ID is not found in the
399/// given keystore. Returns an error if the key could not be used for signing.
400pub fn sign(
401	keystore: &KeystorePtr,
402	key: &ValidatorId,
403	data: &[u8],
404) -> Result<Option<ValidatorSignature>, KeystoreError> {
405	let signature = keystore
406		.sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
407		.map(|sig| sig.into());
408	Ok(signature)
409}
410
411/// Find the validator group the given validator index belongs to.
412pub fn find_validator_group(
413	groups: &[Vec<ValidatorIndex>],
414	index: ValidatorIndex,
415) -> Option<GroupIndex> {
416	groups.iter().enumerate().find_map(|(i, g)| {
417		if g.contains(&index) {
418			Some(GroupIndex(i as _))
419		} else {
420			None
421		}
422	})
423}
424
425/// Choose a random subset of `min` elements.
426/// But always include `is_priority` elements.
427pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
428	choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
429}
430
431/// Choose a random subset of `min` elements using a specific Random Generator `Rng`
432/// But always include `is_priority` elements.
433pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
434	is_priority: F,
435	v: &mut Vec<T>,
436	rng: &mut R,
437	min: usize,
438) {
439	use rand::seq::SliceRandom as _;
440
441	// partition the elements into priority first
442	// the returned index is when non_priority elements start
443	let i = itertools::partition(v.iter_mut(), is_priority);
444
445	if i >= min || v.len() <= i {
446		v.truncate(i);
447		return
448	}
449
450	v[i..].shuffle(rng);
451
452	v.truncate(min);
453}
454
455/// Returns a `bool` with a probability of `a / b` of being true.
456pub fn gen_ratio(a: usize, b: usize) -> bool {
457	gen_ratio_rng(a, b, &mut rand::thread_rng())
458}
459
460/// Returns a `bool` with a probability of `a / b` of being true.
461pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
462	rng.gen_ratio(a as u32, b as u32)
463}
464
465/// Local validator information
466///
467/// It can be created if the local node is a validator in the context of a particular
468/// relay chain block.
469#[derive(Debug)]
470pub struct Validator {
471	signing_context: SigningContext,
472	key: ValidatorId,
473	index: ValidatorIndex,
474	disabled: bool,
475}
476
477impl Validator {
478	/// Get a struct representing this node's validator if this node is in fact a validator in the
479	/// context of the given block.
480	pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
481	where
482		S: SubsystemSender<RuntimeApiMessage>,
483	{
484		// Note: request_validators, request_disabled_validators and request_session_index_for_child
485		// do not and cannot run concurrently: they both have a mutable handle to the same sender.
486		// However, each of them returns a oneshot::Receiver, and those are resolved concurrently.
487		let (validators, disabled_validators, session_index) = futures::try_join!(
488			request_validators(parent, sender).await,
489			request_disabled_validators(parent, sender).await,
490			request_session_index_for_child(parent, sender).await,
491		)?;
492
493		let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
494
495		let validators = validators?;
496
497		let disabled_validators = disabled_validators?;
498
499		Self::construct(&validators, &disabled_validators, signing_context, keystore)
500	}
501
502	/// Construct a validator instance without performing runtime fetches.
503	///
504	/// This can be useful if external code also needs the same data.
505	pub fn construct(
506		validators: &[ValidatorId],
507		disabled_validators: &[ValidatorIndex],
508		signing_context: SigningContext,
509		keystore: KeystorePtr,
510	) -> Result<Self, Error> {
511		let (key, index) =
512			signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
513
514		let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
515
516		Ok(Validator { signing_context, key, index, disabled })
517	}
518
519	/// Get this validator's id.
520	pub fn id(&self) -> ValidatorId {
521		self.key.clone()
522	}
523
524	/// Get this validator's local index.
525	pub fn index(&self) -> ValidatorIndex {
526		self.index
527	}
528
529	/// Get the enabled/disabled state of this validator
530	pub fn disabled(&self) -> bool {
531		self.disabled
532	}
533
534	/// Get the current signing context.
535	pub fn signing_context(&self) -> &SigningContext {
536		&self.signing_context
537	}
538
539	/// Sign a payload with this validator
540	pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
541		&self,
542		keystore: KeystorePtr,
543		payload: Payload,
544	) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
545		Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
546	}
547}