1#![warn(missing_docs)]
27
28pub use overseer::{
29 gen::{OrchestraError as OverseerError, Timeout},
30 Subsystem, TimeoutExt,
31};
32use polkadot_node_subsystem::{
33 errors::{RuntimeApiError, SubsystemError},
34 messages::{RuntimeApiMessage, RuntimeApiRequest, RuntimeApiSender},
35 overseer, SubsystemSender,
36};
37
38pub use polkadot_node_metrics::{metrics, Metronome};
39
40use codec::Encode;
41use futures::channel::{mpsc, oneshot};
42
43use polkadot_primitives::{
44 async_backing::{BackingState, Constraints},
45 slashing, AsyncBackingParams, AuthorityDiscoveryId, CandidateEvent, CandidateHash,
46 CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, CoreState, EncodeAs,
47 ExecutorParams, GroupIndex, GroupRotationInfo, Hash, Id as ParaId, NodeFeatures,
48 OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, SessionIndex,
49 SessionInfo, Signed, SigningContext, ValidationCode, ValidationCodeHash, ValidatorId,
50 ValidatorIndex, ValidatorSignature,
51};
52pub use rand;
53use sp_application_crypto::AppCrypto;
54use sp_core::ByteArray;
55use sp_keystore::{Error as KeystoreError, KeystorePtr};
56use std::{
57 collections::{BTreeMap, VecDeque},
58 time::Duration,
59};
60use thiserror::Error;
61
62pub use determine_new_blocks::determine_new_blocks;
63pub use metered;
64pub use polkadot_node_network_protocol::MIN_GOSSIP_PEERS;
65
66pub mod reexports {
69 pub use polkadot_overseer::gen::{SpawnedSubsystem, Spawner, Subsystem, SubsystemContext};
70}
71
72pub mod availability_chunks;
74pub mod backing_implicit_view;
78pub mod database;
80pub mod inclusion_emulator;
83pub mod runtime;
85
86pub mod vstaging;
88
89pub mod nesting_sender;
94
95pub mod reputation;
96
97mod determine_new_blocks;
98
99mod controlled_validator_indices;
100pub use controlled_validator_indices::ControlledValidatorIndices;
101
102#[cfg(test)]
103mod tests;
104
105const LOG_TARGET: &'static str = "parachain::subsystem-util";
106
107pub const JOB_GRACEFUL_STOP_DURATION: Duration = Duration::from_secs(1);
109pub const JOB_CHANNEL_CAPACITY: usize = 64;
111
112#[derive(Debug, Error)]
114pub enum Error {
115 #[error(transparent)]
117 Oneshot(#[from] oneshot::Canceled),
118 #[error(transparent)]
120 Mpsc(#[from] mpsc::SendError),
121 #[error(transparent)]
123 Subsystem(#[from] SubsystemError),
124 #[error(transparent)]
126 RuntimeApi(#[from] RuntimeApiError),
127 #[error(transparent)]
129 Infallible(#[from] std::convert::Infallible),
130 #[error("AllMessage not relevant to Job")]
132 SenderConversion(String),
133 #[error("Node is not a validator")]
135 NotAValidator,
136 #[error("AlreadyForwarding")]
138 AlreadyForwarding,
139 #[error("Data are not available")]
141 DataNotAvailable,
142}
143
144impl From<OverseerError> for Error {
145 fn from(e: OverseerError) -> Self {
146 Self::from(SubsystemError::from(e))
147 }
148}
149
150impl TryFrom<crate::runtime::Error> for Error {
151 type Error = ();
152
153 fn try_from(e: crate::runtime::Error) -> Result<Self, ()> {
154 use crate::runtime::Error;
155
156 match e {
157 Error::RuntimeRequestCanceled(e) => Ok(Self::Oneshot(e)),
158 Error::RuntimeRequest(e) => Ok(Self::RuntimeApi(e)),
159 Error::NoSuchSession(_) | Error::NoExecutorParams(_) => Err(()),
160 }
161 }
162}
163
164pub type RuntimeApiReceiver<T> = oneshot::Receiver<Result<T, RuntimeApiError>>;
166
167pub async fn request_from_runtime<RequestBuilder, Response, Sender>(
169 parent: Hash,
170 sender: &mut Sender,
171 request_builder: RequestBuilder,
172) -> RuntimeApiReceiver<Response>
173where
174 RequestBuilder: FnOnce(RuntimeApiSender<Response>) -> RuntimeApiRequest,
175 Sender: SubsystemSender<RuntimeApiMessage>,
176{
177 let (tx, rx) = oneshot::channel();
178
179 sender
180 .send_message(RuntimeApiMessage::Request(parent, request_builder(tx)).into())
181 .await;
182
183 rx
184}
185
186pub async fn has_required_runtime<Sender>(
189 sender: &mut Sender,
190 relay_parent: Hash,
191 required_runtime_version: u32,
192) -> bool
193where
194 Sender: SubsystemSender<RuntimeApiMessage>,
195{
196 gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version");
197
198 let (tx, rx) = oneshot::channel();
199 sender
200 .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx)))
201 .await;
202
203 match rx.await {
204 Result::Ok(Ok(runtime_version)) => {
205 gum::trace!(
206 target: LOG_TARGET,
207 ?relay_parent,
208 ?runtime_version,
209 ?required_runtime_version,
210 "Fetched ParachainHost runtime api version"
211 );
212 runtime_version >= required_runtime_version
213 },
214 Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => {
215 gum::trace!(
216 target: LOG_TARGET,
217 ?relay_parent,
218 ?error,
219 "Execution error while fetching ParachainHost runtime api version"
220 );
221 false
222 },
223 Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => {
224 gum::trace!(
225 target: LOG_TARGET,
226 ?relay_parent,
227 "NotSupported error while fetching ParachainHost runtime api version"
228 );
229 false
230 },
231 Result::Err(_) => {
232 gum::trace!(
233 target: LOG_TARGET,
234 ?relay_parent,
235 "Cancelled error while fetching ParachainHost runtime api version"
236 );
237 false
238 },
239 }
240}
241
242macro_rules! specialize_requests {
246 (fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
248 specialize_requests!{
249 named stringify!($request_variant) ; fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant;
250 }
251 };
252
253 (named $doc_name:expr ; fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;) => {
255 #[doc = "Request `"]
256 #[doc = $doc_name]
257 #[doc = "` from the runtime"]
258 pub async fn $func_name (
259 parent: Hash,
260 $(
261 $param_name: $param_ty,
262 )*
263 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
264 ) -> RuntimeApiReceiver<$return_ty>
265 {
266 request_from_runtime(parent, sender, |tx| RuntimeApiRequest::$request_variant(
267 $( $param_name, )* tx
268 )).await
269 }
270 };
271
272 (
274 fn $func_name:ident( $( $param_name:ident : $param_ty:ty ),* ) -> $return_ty:ty ; $request_variant:ident;
275 $(
276 fn $t_func_name:ident( $( $t_param_name:ident : $t_param_ty:ty ),* ) -> $t_return_ty:ty ; $t_request_variant:ident;
277 )+
278 ) => {
279 specialize_requests!{
280 fn $func_name( $( $param_name : $param_ty ),* ) -> $return_ty ; $request_variant ;
281 }
282 specialize_requests!{
283 $(
284 fn $t_func_name( $( $t_param_name : $t_param_ty ),* ) -> $t_return_ty ; $t_request_variant ;
285 )+
286 }
287 };
288}
289
290specialize_requests! {
291 fn request_runtime_api_version() -> u32; Version;
292 fn request_authorities() -> Vec<AuthorityDiscoveryId>; Authorities;
293 fn request_validators() -> Vec<ValidatorId>; Validators;
294 fn request_validator_groups() -> (Vec<Vec<ValidatorIndex>>, GroupRotationInfo); ValidatorGroups;
295 fn request_availability_cores() -> Vec<CoreState>; AvailabilityCores;
296 fn request_persisted_validation_data(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<PersistedValidationData>; PersistedValidationData;
297 fn request_assumed_validation_data(para_id: ParaId, expected_persisted_validation_data_hash: Hash) -> Option<(PersistedValidationData, ValidationCodeHash)>; AssumedValidationData;
298 fn request_session_index_for_child() -> SessionIndex; SessionIndexForChild;
299 fn request_validation_code(para_id: ParaId, assumption: OccupiedCoreAssumption) -> Option<ValidationCode>; ValidationCode;
300 fn request_validation_code_by_hash(validation_code_hash: ValidationCodeHash) -> Option<ValidationCode>; ValidationCodeByHash;
301 fn request_candidate_pending_availability(para_id: ParaId) -> Option<CommittedCandidateReceipt>; CandidatePendingAvailability;
302 fn request_candidates_pending_availability(para_id: ParaId) -> Vec<CommittedCandidateReceipt>; CandidatesPendingAvailability;
303 fn request_candidate_events() -> Vec<CandidateEvent>; CandidateEvents;
304 fn request_session_info(index: SessionIndex) -> Option<SessionInfo>; SessionInfo;
305 fn request_validation_code_hash(para_id: ParaId, assumption: OccupiedCoreAssumption)
306 -> Option<ValidationCodeHash>; ValidationCodeHash;
307 fn request_on_chain_votes() -> Option<ScrapedOnChainVotes>; FetchOnChainVotes;
308 fn request_session_executor_params(session_index: SessionIndex) -> Option<ExecutorParams>;SessionExecutorParams;
309 fn request_unapplied_slashes() -> Vec<(SessionIndex, CandidateHash, slashing::LegacyPendingSlashes)>; UnappliedSlashes;
310 fn request_unapplied_slashes_v2() -> Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>; UnappliedSlashesV2;
311 fn request_key_ownership_proof(validator_id: ValidatorId) -> Option<slashing::OpaqueKeyOwnershipProof>; KeyOwnershipProof;
312 fn request_submit_report_dispute_lost(dp: slashing::DisputeProof, okop: slashing::OpaqueKeyOwnershipProof) -> Option<()>; SubmitReportDisputeLost;
313 fn request_disabled_validators() -> Vec<ValidatorIndex>; DisabledValidators;
314 fn request_async_backing_params() -> AsyncBackingParams; AsyncBackingParams;
315 fn request_claim_queue() -> BTreeMap<CoreIndex, VecDeque<ParaId>>; ClaimQueue;
316 fn request_para_backing_state(para_id: ParaId) -> Option<BackingState>; ParaBackingState;
317 fn request_backing_constraints(para_id: ParaId) -> Option<Constraints>; BackingConstraints;
318 fn request_min_backing_votes(session_index: SessionIndex) -> u32; MinimumBackingVotes;
319 fn request_node_features(session_index: SessionIndex) -> NodeFeatures; NodeFeatures;
320 fn request_para_ids(session_index: SessionIndex) -> Vec<ParaId>; ParaIds;
321
322}
323
324pub async fn executor_params_at_relay_parent(
334 relay_parent: Hash,
335 sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
336) -> Result<ExecutorParams, Error> {
337 match request_session_index_for_child(relay_parent, sender).await.await {
338 Err(err) => {
339 Err(Error::Oneshot(err))
341 },
342 Ok(Err(err)) => {
343 Err(Error::RuntimeApi(err))
345 },
346 Ok(Ok(session_index)) => {
347 match request_session_executor_params(relay_parent, session_index, sender).await.await {
348 Err(err) => {
349 Err(Error::Oneshot(err))
351 },
352 Ok(Err(RuntimeApiError::NotSupported { .. })) => {
353 Ok(ExecutorParams::default())
356 },
357 Ok(Err(err)) => {
358 Err(Error::RuntimeApi(err))
360 },
361 Ok(Ok(None)) => {
362 Err(Error::DataNotAvailable)
365 },
366 Ok(Ok(Some(executor_params))) => Ok(executor_params),
367 }
368 },
369 }
370}
371
372pub fn signing_key<'a>(
374 validators: impl IntoIterator<Item = &'a ValidatorId>,
375 keystore: &KeystorePtr,
376) -> Option<ValidatorId> {
377 signing_key_and_index(validators, keystore).map(|(k, _)| k)
378}
379
380pub fn signing_key_and_index<'a>(
383 validators: impl IntoIterator<Item = &'a ValidatorId>,
384 keystore: &KeystorePtr,
385) -> Option<(ValidatorId, ValidatorIndex)> {
386 for (i, v) in validators.into_iter().enumerate() {
387 if keystore.has_keys(&[(v.to_raw_vec(), ValidatorId::ID)]) {
388 return Some((v.clone(), ValidatorIndex(i as _)))
389 }
390 }
391 None
392}
393
394pub fn sign(
399 keystore: &KeystorePtr,
400 key: &ValidatorId,
401 data: &[u8],
402) -> Result<Option<ValidatorSignature>, KeystoreError> {
403 let signature = keystore
404 .sr25519_sign(ValidatorId::ID, key.as_ref(), data)?
405 .map(|sig| sig.into());
406 Ok(signature)
407}
408
409pub fn find_validator_group(
411 groups: &[Vec<ValidatorIndex>],
412 index: ValidatorIndex,
413) -> Option<GroupIndex> {
414 groups.iter().enumerate().find_map(|(i, g)| {
415 if g.contains(&index) {
416 Some(GroupIndex(i as _))
417 } else {
418 None
419 }
420 })
421}
422
423pub fn choose_random_subset<T, F: FnMut(&T) -> bool>(is_priority: F, v: &mut Vec<T>, min: usize) {
426 choose_random_subset_with_rng(is_priority, v, &mut rand::thread_rng(), min)
427}
428
429pub fn choose_random_subset_with_rng<T, F: FnMut(&T) -> bool, R: rand::Rng>(
432 is_priority: F,
433 v: &mut Vec<T>,
434 rng: &mut R,
435 min: usize,
436) {
437 use rand::seq::SliceRandom as _;
438
439 let i = itertools::partition(v.iter_mut(), is_priority);
442
443 if i >= min || v.len() <= i {
444 v.truncate(i);
445 return
446 }
447
448 v[i..].shuffle(rng);
449
450 v.truncate(min);
451}
452
453pub fn gen_ratio(a: usize, b: usize) -> bool {
455 gen_ratio_rng(a, b, &mut rand::thread_rng())
456}
457
458pub fn gen_ratio_rng<R: rand::Rng>(a: usize, b: usize, rng: &mut R) -> bool {
460 rng.gen_ratio(a as u32, b as u32)
461}
462
463#[derive(Debug)]
468pub struct Validator {
469 signing_context: SigningContext,
470 key: ValidatorId,
471 index: ValidatorIndex,
472 disabled: bool,
473}
474
475impl Validator {
476 pub async fn new<S>(parent: Hash, keystore: KeystorePtr, sender: &mut S) -> Result<Self, Error>
479 where
480 S: SubsystemSender<RuntimeApiMessage>,
481 {
482 let (validators, disabled_validators, session_index) = futures::try_join!(
486 request_validators(parent, sender).await,
487 request_disabled_validators(parent, sender).await,
488 request_session_index_for_child(parent, sender).await,
489 )?;
490
491 let signing_context = SigningContext { session_index: session_index?, parent_hash: parent };
492
493 let validators = validators?;
494
495 let disabled_validators = disabled_validators?;
496
497 Self::construct(&validators, &disabled_validators, signing_context, keystore)
498 }
499
500 pub fn construct(
504 validators: &[ValidatorId],
505 disabled_validators: &[ValidatorIndex],
506 signing_context: SigningContext,
507 keystore: KeystorePtr,
508 ) -> Result<Self, Error> {
509 let (key, index) =
510 signing_key_and_index(validators, &keystore).ok_or(Error::NotAValidator)?;
511
512 let disabled = disabled_validators.iter().any(|d: &ValidatorIndex| *d == index);
513
514 Ok(Validator { signing_context, key, index, disabled })
515 }
516
517 pub fn id(&self) -> ValidatorId {
519 self.key.clone()
520 }
521
522 pub fn index(&self) -> ValidatorIndex {
524 self.index
525 }
526
527 pub fn disabled(&self) -> bool {
529 self.disabled
530 }
531
532 pub fn signing_context(&self) -> &SigningContext {
534 &self.signing_context
535 }
536
537 pub fn sign<Payload: EncodeAs<RealPayload>, RealPayload: Encode>(
539 &self,
540 keystore: KeystorePtr,
541 payload: Payload,
542 ) -> Result<Option<Signed<Payload, RealPayload>>, KeystoreError> {
543 Signed::sign(&keystore, payload, &self.signing_context, self.index, &self.key)
544 }
545}