1#![warn(missing_docs)]
27
28pub use overseer::{
29 gen::{OrchestraError as OverseerError, Timeout},
30 Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33 errors::{RuntimeApiError, SubsystemError},
34 messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35 overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44 slashing,
45 vstaging::{
46 async_backing::{BackingState, Constraints},
47 CandidateEvent, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState,
48 ScrapedOnChainVotes,
49 },
50 AsyncBackingParams, AuthorityDiscoveryId, CandidateHash, CoreIndex, EncodeAs, ExecutorParams,
51 GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures, OccupiedCoreAssumption,
52 PersistedValidationData, SessionIndex, SessionInfo, Signed, SigningContext, ValidationCode,
53 ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
54};
55pub use rand;
56use sp_application_crypto::AppCrypto;
57use sp_core::ByteArray;
58use sp_keystore::{Error as KeystoreError, KeystorePtr};
59use std::{
60 collections::{BTreeMap, VecDeque},
61 time::Duration,
62};
63use thiserror::Error;
64
65pub use determine_new_blocks::determine_new_blocks;
66pub use metered;
67pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
68
69pub mod reexports {
72 pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
73}
74
75pub mod availability_chunks;
77pub mod backing_implicit_view;
81pub mod database;
83pub mod inclusion_emulator;
86pub mod runtime;
88
89pub mod vstaging;
91
92pub mod nesting_sender;
97
98pub mod reputation;
99
100mod determine_new_blocks;
101
102mod controlled_validator_indices;
103pub use controlled_validator_indices::ControlledValidatorIndices;
104
105#[cfg(test)]
106mod tests;
107
108const LOG_TARGET: &'static str = "parachain::subsystem-util";
109
110pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
112pub const JOB_CHANNEL_CAPACITY: usize = 64;
114
115#[derive(Debug, Error)]
117pub enum Error {
118 #[error(transparent)]
120 Oneshot(#[from] oneshot::Canceled),
121 #[error(transparent)]
123 Mpsc(#[from] mpsc::SendError),
124 #[error(transparent)]
126 Subsystem(#[from] SubsystemError),
127 #[error(transparent)]
129 RuntimeApi(#[from] RuntimeApiError),
130 #[error(transparent)]
132 Infallible(#[from] std::convert::Infallible),
133 #[error("AllMessage not relevant to Job")]
135 SenderConversion(String),
136 #[error("Node is not a validator")]
138 NotAValidator,
139 #[error("AlreadyForwarding")]
141 AlreadyForwarding,
142 #[error("Data are not available")]
144 DataNotAvailable,
145}
146
147impl From<OverseerError> for Error {
148 fn from(e: OverseerError) -> Self {
149 Self::from(SubsystemError::from(e))
150 }
151}
152
153impl TryFrom<crate::runtime::Error> for Error {
154 type Error = ();
155
156 fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
157 use crate::runtime::Error;
158
159 match e {
160 Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
161 Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
162 Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
163 }
164 }
165}
166
167pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
169
170pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
172 parent: Hash,
173 sender: &mut Sender,
174 request_builder: RequestBuilder,
175) -> RuntimeApiReceiver<Response>
176where
177 RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
178 Sender: SubsystemSender<RuntimeApiMessage>,
179{
180 let (tx, rx) = oneshot::channel();
181
182 sender
183 .send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
184 .await;
185
186 rx
187}
188
189pub async fn has_required_runtime<Sender>(
192 sender: &mut Sender,
193 relay_parent: Hash,
194 required_runtime_version: u32,
195) -> bool
196where
197 Sender: SubsystemSender<RuntimeApiMessage>,
198{
199 gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
200
201 let (tx, rx) = oneshot::channel();
202 sender
203 .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
204 .await;
205
206 match rx.await {
207 Result::Ok(Ok(runtime_version)) => {
208 gum::trace!(
209 target: LOG_TARGET,
210 ?relay_parent,
211 ?runtime_version,
212 ?required_runtime_version,
213 "Fetched ParachainHost runtime api version"
214 );
215 runtime_version >= required_runtime_version
216 },
217 Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
218 gum::trace!(
219 target: LOG_TARGET,
220 ?relay_parent,
221 ?error,
222 "Execution error while fetching ParachainHost runtime api version"
223 );
224 false
225 },
226 Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
227 gum::trace!(
228 target: LOG_TARGET,
229 ?relay_parent,
230 "NotSupported error while fetching ParachainHost runtime api version"
231 );
232 false
233 },
234 Result::Err(_) => {
235 gum::trace!(
236 target: LOG_TARGET,
237 ?relay_parent,
238 "Cancelled error while fetching ParachainHost runtime api version"
239 );
240 false
241 },
242 }
243}
244
245macro_rules! specialize_requests {
249 (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
251 specialize_requests!{
252 named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
253 }
254 };
255
256 (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
258 #[doc = "Request `"]
259 #[doc = $doc_name]
260 #[doc = "` from the runtime"]
261 pub async fn $func_name (
262 parent: Hash,
263 $(
264 $param_name: $param_ty,
265 )*
266 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
267 ) -> RuntimeApiReceiver<$return_ty>
268 {
269 request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
270 $( $param_name, )* tx
271 )).await
272 }
273 };
274
275 (
277 fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
278 $(
279 fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
280 )+
281 ) => {
282 specialize_requests!{
283 fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
284 }
285 specialize_requests!{
286 $(
287 fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
288 )+
289 }
290 };
291}
292
293specialize_requests! {
294 fn request_runtime_api_version() -> u32; Version;
295 fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
296 fn request_validators() -> Vec<ValidatorId>; Validators;
297 fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
298 fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
299 fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
300 fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
301 fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
302 fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
303 fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
304 fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
305 fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
306 fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
307 fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
308 fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
309 -> Option<ValidationCodeHash>; ValidationCodeHash;
310 fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
311 fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
312 fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashes;
313 fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
314 fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
315 fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
316 fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
317 fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
318 fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
319 fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
320 fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
321 fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
322 fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
323
324}
325
326pub async fn executor_params_at_relay_parent(
336 relay_parent: Hash,
337 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
338) -> Result<ExecutorParams, Error> {
339 match request_session_index_for_child(relay_parent, sender).await.await {
340 Err(err) => {
341 Err(Error::Oneshot(err))
343 },
344 Ok(Err(err)) => {
345 Err(Error::RuntimeApi(err))
347 },
348 Ok(Ok(session_index)) => {
349 match request_session_executor_params(relay_parent, session_index, sender).await.await {
350 Err(err) => {
351 Err(Error::Oneshot(err))
353 },
354 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
355 Ok(ExecutorParams::default())
358 },
359 Ok(Err(err)) => {
360 Err(Error::RuntimeApi(err))
362 },
363 Ok(Ok(None)) => {
364 Err(Error::DataNotAvailable)
367 },
368 Ok(Ok(Some(executor_params))) => Ok(executor_params),
369 }
370 },
371 }
372}
373
374pub fn signing_key<'a>(
376 validators: impl IntoIterator<Item = &'a ValidatorId>,
377 keystore: &KeystorePtr,
378) -> Option<ValidatorId> {
379 signing_key_and_index(validators, keystore).map(|(k, _)| k)
380}
381
382pub fn signing_key_and_index<'a>(
385 validators: impl IntoIterator<Item = &'a ValidatorId>,
386 keystore: &KeystorePtr,
387) -> Option<(ValidatorId, ValidatorIndex)> {
388 for (i, v) in validators.into_iter().enumerate() {
389 if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
390 return Some((v.clone(), ValidatorIndex(i as _)))
391 }
392 }
393 None
394}
395
396pub fn sign(
401 keystore: &KeystorePtr,
402 key: &ValidatorId,
403 data: &[u8],
404) -> Result<Option<ValidatorSignature>, KeystoreError> {
405 let signature = keystore
406 .sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
407 .map(|sig| sig.into());
408 Ok(signature)
409}
410
411pub fn find_validator_group(
413 groups: &[Vec<ValidatorIndex>],
414 index: ValidatorIndex,
415) -> Option<GroupIndex> {
416 groups.iter().enumerate().find_map(|(i, g)| {
417 if g.contains(&index) {
418 Some(GroupIndex(i as _))
419 } else {
420 None
421 }
422 })
423}
424
425pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
428 choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
429}
430
431pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
434 is_priority: F,
435 v: &mut Vec<T>,
436 rng: &mut R,
437 min: usize,
438) {
439 use rand::seq::SliceRandom as _;
440
441 let i = itertools::partition(v.iter_mut(), is_priority);
444
445 if i >= min || v.len() <= i {
446 v.truncate(i);
447 return
448 }
449
450 v[i..].shuffle(rng);
451
452 v.truncate(min);
453}
454
455pub fn gen_ratio(a: usize, b: usize) -> bool {
457 gen_ratio_rng(a, b, &mut rand::thread_rng())
458}
459
460pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
462 rng.gen_ratio(a as u32, b as u32)
463}
464
465#[derive(Debug)]
470pub struct Validator {
471 signing_context: SigningContext,
472 key: ValidatorId,
473 index: ValidatorIndex,
474 disabled: bool,
475}
476
477impl Validator {
478 pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
481 where
482 S: SubsystemSender<RuntimeApiMessage>,
483 {
484 let (validators, disabled_validators, session_index) = futures::try_join!(
488 request_validators(parent, sender).await,
489 request_disabled_validators(parent, sender).await,
490 request_session_index_for_child(parent, sender).await,
491 )?;
492
493 let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
494
495 let validators = validators?;
496
497 let disabled_validators = disabled_validators?;
498
499 Self::construct(&validators, &disabled_validators, signing_context, keystore)
500 }
501
502 pub fn construct(
506 validators: &[ValidatorId],
507 disabled_validators: &[ValidatorIndex],
508 signing_context: SigningContext,
509 keystore: KeystorePtr,
510 ) -> Result<Self, Error> {
511 let (key, index) =
512 signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
513
514 let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
515
516 Ok(Validator { signing_context, key, index, disabled })
517 }
518
519 pub fn id(&self) -> ValidatorId {
521 self.key.clone()
522 }
523
524 pub fn index(&self) -> ValidatorIndex {
526 self.index
527 }
528
529 pub fn disabled(&self) -> bool {
531 self.disabled
532 }
533
534 pub fn signing_context(&self) -> &SigningContext {
536 &self.signing_context
537 }
538
539 pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
541 &self,
542 keystore: KeystorePtr,
543 payload: Payload,
544 ) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
545 Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
546 }
547}