polkadot_node_core_backing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the `CandidateBackingSubsystem`.
18//!
19//! This subsystem maintains the entire responsibility of tracking parachain
20//! candidates which can be backed, as well as the issuance of statements
21//! about candidates when run on a validator node.
22//!
23//! There are two types of statements: `Seconded` and `Valid`.
24//! `Seconded` implies `Valid`, and nothing should be stated as
25//! `Valid` unless its already been `Seconded`.
26//!
27//! Validators may only second candidates which fall under their own group
28//! assignment, and they may only second one candidate per depth per active leaf.
29//! Candidates which are stated as either `Second` or `Valid` by a majority of the
30//! assigned group of validators may be backed on-chain and proceed to the availability
31//! stage.
32//!
33//! Depth is a concept relating to asynchronous backing, by which
34//! short sub-chains of candidates are backed and extended off-chain, and then placed
35//! asynchronously into blocks of the relay chain as those are authored and as the
36//! relay-chain state becomes ready for them. Asynchronous backing allows parachains to
37//! grow mostly independently from the state of the relay chain, which gives more time for
38//! parachains to be validated and thereby increases performance.
39//!
40//! Most of the work of asynchronous backing is handled by the Prospective Parachains
41//! subsystem. The 'depth' of a parachain block with respect to a relay chain block is
42//! a measure of how many parachain blocks are between the most recent included parachain block
43//! in the post-state of the relay-chain block and the candidate. For instance,
44//! a candidate that descends directly from the most recent parachain block in the relay-chain
45//! state has depth 0. The child of that candidate would have depth 1. And so on.
46//!
47//! The candidate backing subsystem keeps track of a set of 'active leaves' which are the
48//! most recent blocks in the relay-chain (which is in fact a tree) which could be built
49//! upon. Depth is always measured against active leaves, and the valid relay-parent that
50//! each candidate can have is determined by the active leaves. The Prospective Parachains
51//! subsystem enforces that the relay-parent increases monotonically, so that logic
52//! is not handled here. By communicating with the Prospective Parachains subsystem,
53//! this subsystem extrapolates an "implicit view" from the set of currently active leaves,
54//! which determines the set of all recent relay-chain block hashes which could be relay-parents
55//! for candidates backed in children of the active leaves.
56//!
57//! In fact, this subsystem relies on the Statement Distribution subsystem to prevent spam
58//! by enforcing the rule that each validator may second at most one candidate per depth per
59//! active leaf. This bounds the number of candidates that the system needs to consider and
60//! is not handled within this subsystem, except for candidates seconded locally.
61//!
62//! This subsystem also handles relay-chain heads which don't support asynchronous backing.
63//! For such active leaves, the only valid relay-parent is the leaf hash itself and the only
64//! allowed depth is 0.
65
66#![deny(unused_crate_dependencies)]
67
68use std::{
69	collections::{HashMap, HashSet},
70	sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75	channel::{mpsc, oneshot},
76	future::BoxFuture,
77	stream::FuturesOrdered,
78	FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84	AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85	ValidationResult,
86};
87use polkadot_node_subsystem::{
88	messages::{
89		AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
90		CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
91		HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
92		ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
93		RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
94		StoreAvailableDataError,
95	},
96	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97	SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100	self as util,
101	backing_implicit_view::View as ImplicitView,
102	request_claim_queue, request_disabled_validators, request_min_backing_votes,
103	request_node_features, request_session_executor_params, request_session_index_for_child,
104	request_validator_groups, request_validators,
105	runtime::{self, ClaimQueueSnapshot},
106	Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110	vstaging::{
111		BackedCandidate, CandidateReceiptV2 as CandidateReceipt,
112		CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
113	},
114	CandidateCommitments, CandidateHash, CoreIndex, ExecutorParams, GroupIndex, GroupRotationInfo,
115	Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData, SessionIndex,
116	SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
117	ValidityAttestation,
118};
119use polkadot_statement_table::{
120	generic::AttestedCandidate as TableAttestedCandidate,
121	v2::{
122		SignedStatement as TableSignedStatement, Statement as TableStatement,
123		Summary as TableSummary,
124	},
125	Context as TableContextTrait, Table,
126};
127use sp_keystore::KeystorePtr;
128
129mod error;
130
131mod metrics;
132use self::metrics::Metrics;
133
134#[cfg(test)]
135mod tests;
136
137const LOG_TARGET: &str = "parachain::candidate-backing";
138
139/// PoV data to validate.
140enum PoVData {
141	/// Already available (from candidate selection).
142	Ready(Arc<PoV>),
143	/// Needs to be fetched from validator (we are checking a signed statement).
144	FetchFromValidator {
145		from_validator: ValidatorIndex,
146		candidate_hash: CandidateHash,
147		pov_hash: Hash,
148	},
149}
150
151enum ValidatedCandidateCommand {
152	// We were instructed to second the candidate that has been already validated.
153	Second(BackgroundValidationResult),
154	// We were instructed to validate the candidate.
155	Attest(BackgroundValidationResult),
156	// We were not able to `Attest` because backing validator did not send us the PoV.
157	AttestNoPoV(CandidateHash),
158}
159
160impl std::fmt::Debug for ValidatedCandidateCommand {
161	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
162		let candidate_hash = self.candidate_hash();
163		match *self {
164			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
165			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
166			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
167		}
168	}
169}
170
171impl ValidatedCandidateCommand {
172	fn candidate_hash(&self) -> CandidateHash {
173		match *self {
174			ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
175			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
176			ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
177			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
178			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
179		}
180	}
181}
182
183/// The candidate backing subsystem.
184pub struct CandidateBackingSubsystem {
185	keystore: KeystorePtr,
186	metrics: Metrics,
187}
188
189impl CandidateBackingSubsystem {
190	/// Create a new instance of the `CandidateBackingSubsystem`.
191	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
192		Self { keystore, metrics }
193	}
194}
195
196#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
197impl<Context> CandidateBackingSubsystem
198where
199	Context: Send + Sync,
200{
201	fn start(self, ctx: Context) -> SpawnedSubsystem {
202		let future = async move {
203			run(ctx, self.keystore, self.metrics)
204				.await
205				.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
206		}
207		.boxed();
208
209		SpawnedSubsystem { name: "candidate-backing-subsystem", future }
210	}
211}
212
213struct PerRelayParentState {
214	/// The hash of the relay parent on top of which this job is doing it's work.
215	parent: Hash,
216	/// The node features.
217	node_features: NodeFeatures,
218	/// The executor parameters.
219	executor_params: Arc<ExecutorParams>,
220	/// The `CoreIndex` assigned to the local validator at this relay parent.
221	assigned_core: Option<CoreIndex>,
222	/// The candidates that are backed by enough validators in their group, by hash.
223	backed: HashSet<CandidateHash>,
224	/// The table of candidates and statements under this relay-parent.
225	table: Table<TableContext>,
226	/// The table context, including groups.
227	table_context: TableContext,
228	/// We issued `Seconded` or `Valid` statements on about these candidates.
229	issued_statements: HashSet<CandidateHash>,
230	/// These candidates are undergoing validation in the background.
231	awaiting_validation: HashSet<CandidateHash>,
232	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
233	fallbacks: HashMap<CandidateHash, AttestingData>,
234	/// The minimum backing votes threshold.
235	minimum_backing_votes: u32,
236	/// The number of cores.
237	n_cores: u32,
238	/// Claim queue state. If the runtime API is not available, it'll be populated with info from
239	/// availability cores.
240	claim_queue: ClaimQueueSnapshot,
241	/// The validator index -> group mapping at this relay parent.
242	validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
243	/// The associated group rotation information.
244	group_rotation_info: GroupRotationInfo,
245}
246
247struct PerCandidateState {
248	persisted_validation_data: PersistedValidationData,
249	seconded_locally: bool,
250	relay_parent: Hash,
251}
252
253/// A cache for storing data per-session to reduce repeated
254/// runtime API calls and avoid redundant computations.
255struct PerSessionCache {
256	/// Cache for storing validators list, retrieved from the runtime.
257	validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
258	/// Cache for storing node features, retrieved from the runtime.
259	node_features_cache: LruMap<SessionIndex, NodeFeatures>,
260	/// Cache for storing executor parameters, retrieved from the runtime.
261	executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
262	/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
263	minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
264	/// Cache for storing validator-to-group mappings, computed from validator groups.
265	validator_to_group_cache:
266		LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
267}
268
269impl Default for PerSessionCache {
270	/// Creates a new `PerSessionCache` with a default capacity.
271	fn default() -> Self {
272		Self::new(2)
273	}
274}
275
276impl PerSessionCache {
277	/// Creates a new `PerSessionCache` with a given capacity.
278	fn new(capacity: u32) -> Self {
279		PerSessionCache {
280			validators_cache: LruMap::new(ByLength::new(capacity)),
281			node_features_cache: LruMap::new(ByLength::new(capacity)),
282			executor_params_cache: LruMap::new(ByLength::new(capacity)),
283			minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
284			validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
285		}
286	}
287
288	/// Gets validators from the cache or fetches them from the runtime if not present.
289	async fn validators(
290		&mut self,
291		session_index: SessionIndex,
292		parent: Hash,
293		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
294	) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
295		// Try to get the validators list from the cache.
296		if let Some(validators) = self.validators_cache.get(&session_index) {
297			return Ok(Arc::clone(validators));
298		}
299
300		// Fetch the validators list from the runtime since it was not in the cache.
301		let validators: Vec<ValidatorId> =
302			request_validators(parent, sender).await.await.map_err(|err| {
303				RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
304			})??;
305
306		// Wrap the validators list in an Arc to avoid a deep copy when storing it in the cache.
307		let validators = Arc::new(validators);
308
309		// Cache the fetched validators list for future use.
310		self.validators_cache.insert(session_index, Arc::clone(&validators));
311
312		Ok(validators)
313	}
314
315	/// Gets the node features from the cache or fetches it from the runtime if not present.
316	async fn node_features(
317		&mut self,
318		session_index: SessionIndex,
319		parent: Hash,
320		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
321	) -> Result<NodeFeatures, RuntimeApiError> {
322		// Try to get the node features from the cache.
323		if let Some(node_features) = self.node_features_cache.get(&session_index) {
324			return Ok(node_features.clone());
325		}
326
327		// Fetch the node features from the runtime since it was not in the cache.
328		let node_features = request_node_features(parent, session_index, sender)
329			.await
330			.await
331			.map_err(|err| RuntimeApiError::Execution {
332				runtime_api_name: "NodeFeatures",
333				source: Arc::new(err),
334			})??;
335
336		// Cache the fetched node features for future use.
337		self.node_features_cache.insert(session_index, node_features.clone());
338
339		Ok(node_features)
340	}
341
342	/// Gets the executor parameters from the cache or
343	/// fetches them from the runtime if not present.
344	async fn executor_params(
345		&mut self,
346		session_index: SessionIndex,
347		parent: Hash,
348		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
349	) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
350		// Try to get the executor parameters from the cache.
351		if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
352			return Ok(Arc::clone(executor_params));
353		}
354
355		// Fetch the executor parameters from the runtime since it was not in the cache.
356		let executor_params = request_session_executor_params(parent, session_index, sender)
357			.await
358			.await
359			.map_err(|err| RuntimeApiError::Execution {
360				runtime_api_name: "SessionExecutorParams",
361				source: Arc::new(err),
362			})??
363			.ok_or_else(|| RuntimeApiError::Execution {
364				runtime_api_name: "SessionExecutorParams",
365				source: Arc::new(Error::MissingExecutorParams),
366			})?;
367
368		// Wrap the executor parameters in an Arc to avoid a deep copy when storing it in the cache.
369		let executor_params = Arc::new(executor_params);
370
371		// Cache the fetched executor parameters for future use.
372		self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
373
374		Ok(executor_params)
375	}
376
377	/// Gets the minimum backing votes threshold from the
378	/// cache or fetches it from the runtime if not present.
379	async fn minimum_backing_votes(
380		&mut self,
381		session_index: SessionIndex,
382		parent: Hash,
383		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
384	) -> Result<u32, RuntimeApiError> {
385		// Try to get the value from the cache.
386		if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
387			return Ok(*minimum_backing_votes);
388		}
389
390		// Fetch the value from the runtime since it was not in the cache.
391		let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
392			.await
393			.await
394			.map_err(|err| RuntimeApiError::Execution {
395				runtime_api_name: "MinimumBackingVotes",
396				source: Arc::new(err),
397			})??;
398
399		// Cache the fetched value for future use.
400		self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
401
402		Ok(minimum_backing_votes)
403	}
404
405	/// Gets or computes the validator-to-group mapping for a session.
406	fn validator_to_group(
407		&mut self,
408		session_index: SessionIndex,
409		validators: &[ValidatorId],
410		validator_groups: &[Vec<ValidatorIndex>],
411	) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
412		let validator_to_group = self
413			.validator_to_group_cache
414			.get_or_insert(session_index, || {
415				let mut vector = vec![None; validators.len()];
416
417				for (group_idx, validator_group) in validator_groups.iter().enumerate() {
418					for validator in validator_group {
419						vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
420					}
421				}
422
423				Arc::new(IndexedVec::<_, _>::from(vector))
424			})
425			.expect("Just inserted");
426
427		Arc::clone(validator_to_group)
428	}
429}
430
431/// The state of the subsystem.
432struct State {
433	/// The utility for managing the implicit and explicit views in a consistent way.
434	implicit_view: ImplicitView,
435	/// State tracked for all relay-parents backing work is ongoing for. This includes
436	/// all active leaves.
437	per_relay_parent: HashMap<Hash, PerRelayParentState>,
438	/// State tracked for all candidates relevant to the implicit view.
439	///
440	/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
441	/// or explicit view for which a `Seconded` statement has been successfully imported.
442	per_candidate: HashMap<CandidateHash, PerCandidateState>,
443	/// A local cache for storing per-session data. This cache helps to
444	/// reduce repeated calls to the runtime and avoid redundant computations.
445	per_session_cache: PerSessionCache,
446	/// A clonable sender which is dispatched to background candidate validation tasks to inform
447	/// the main task of the result.
448	background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
449	/// The handle to the keystore used for signing.
450	keystore: KeystorePtr,
451}
452
453impl State {
454	fn new(
455		background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
456		keystore: KeystorePtr,
457	) -> Self {
458		State {
459			implicit_view: ImplicitView::default(),
460			per_relay_parent: HashMap::default(),
461			per_candidate: HashMap::new(),
462			per_session_cache: PerSessionCache::default(),
463			background_validation_tx,
464			keystore,
465		}
466	}
467}
468
469#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
470async fn run<Context>(
471	mut ctx: Context,
472	keystore: KeystorePtr,
473	metrics: Metrics,
474) -> FatalResult<()> {
475	let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
476	let mut state = State::new(background_validation_tx, keystore);
477
478	loop {
479		let res =
480			run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
481
482		match res {
483			Ok(()) => break,
484			Err(e) => crate::error::log_error(Err(e))?,
485		}
486	}
487
488	Ok(())
489}
490
491#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
492async fn run_iteration<Context>(
493	ctx: &mut Context,
494	state: &mut State,
495	metrics: &Metrics,
496	background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
497) -> Result<(), Error> {
498	loop {
499		futures::select!(
500			validated_command = background_validation_rx.next().fuse() => {
501				if let Some((relay_parent, command)) = validated_command {
502					handle_validated_candidate_command(
503						&mut *ctx,
504						state,
505						relay_parent,
506						command,
507						metrics,
508					).await?;
509				} else {
510					panic!("background_validation_tx always alive at this point; qed");
511				}
512			}
513			from_overseer = ctx.recv().fuse() => {
514				match from_overseer.map_err(Error::OverseerExited)? {
515					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
516						handle_active_leaves_update(
517							&mut *ctx,
518							update,
519							state,
520						).await?;
521					}
522					FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
523					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
524					FromOrchestra::Communication { msg } => {
525						handle_communication(&mut *ctx, state, msg, metrics).await?;
526					}
527				}
528			}
529		)
530	}
531}
532
533/// In case a backing validator does not provide a PoV, we need to retry with other backing
534/// validators.
535///
536/// This is the data needed to accomplish this. Basically all the data needed for spawning a
537/// validation job and a list of backing validators, we can try.
538#[derive(Clone)]
539struct AttestingData {
540	/// The candidate to attest.
541	candidate: CandidateReceipt,
542	/// Hash of the PoV we need to fetch.
543	pov_hash: Hash,
544	/// Validator we are currently trying to get the PoV from.
545	from_validator: ValidatorIndex,
546	/// Other backing validators we can try in case `from_validator` failed.
547	backing: Vec<ValidatorIndex>,
548}
549
550#[derive(Default, Debug)]
551struct TableContext {
552	validator: Option<Validator>,
553	groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
554	validators: Vec<ValidatorId>,
555	disabled_validators: Vec<ValidatorIndex>,
556}
557
558impl TableContext {
559	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
560	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
561		self.disabled_validators
562			.iter()
563			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
564	}
565
566	// Returns `true` if the local validator is in the disabled validators list
567	pub fn local_validator_is_disabled(&self) -> Option<bool> {
568		self.validator.as_ref().map(|v| v.disabled())
569	}
570}
571
572impl TableContextTrait for TableContext {
573	type AuthorityId = ValidatorIndex;
574	type Digest = CandidateHash;
575	type GroupId = CoreIndex;
576	type Signature = ValidatorSignature;
577	type Candidate = CommittedCandidateReceipt;
578
579	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
580		candidate.hash()
581	}
582
583	fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
584		self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
585	}
586
587	fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
588		self.groups.get(group).map(|g| g.len())
589	}
590}
591
592// It looks like it's not possible to do an `impl From` given the current state of
593// the code. So this does the necessary conversion.
594fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
595	let statement = match s.payload() {
596		StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
597		StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
598	};
599
600	TableSignedStatement {
601		statement,
602		signature: s.signature().clone(),
603		sender: s.validator_index(),
604	}
605}
606
607fn table_attested_to_backed(
608	attested: TableAttestedCandidate<
609		CoreIndex,
610		CommittedCandidateReceipt,
611		ValidatorIndex,
612		ValidatorSignature,
613	>,
614	table_context: &TableContext,
615) -> Option<BackedCandidate> {
616	let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
617
618	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
619		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
620
621	let group = table_context.groups.get(&core_index)?;
622
623	let mut validator_indices = BitVec::with_capacity(group.len());
624
625	validator_indices.resize(group.len(), false);
626
627	// The order of the validity votes in the backed candidate must match
628	// the order of bits set in the bitfield, which is not necessarily
629	// the order of the `validity_votes` we got from the table.
630	let mut vote_positions = Vec::with_capacity(validity_votes.len());
631	for (orig_idx, id) in ids.iter().enumerate() {
632		if let Some(position) = group.iter().position(|x| x == id) {
633			validator_indices.set(position, true);
634			vote_positions.push((orig_idx, position));
635		} else {
636			gum::warn!(
637				target: LOG_TARGET,
638				"Logic error: Validity vote from table does not correspond to group",
639			);
640
641			return None
642		}
643	}
644	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
645
646	Some(BackedCandidate::new(
647		candidate,
648		vote_positions
649			.into_iter()
650			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
651			.collect(),
652		validator_indices,
653		core_index,
654	))
655}
656
657async fn store_available_data(
658	sender: &mut impl overseer::CandidateBackingSenderTrait,
659	n_validators: u32,
660	candidate_hash: CandidateHash,
661	available_data: AvailableData,
662	expected_erasure_root: Hash,
663	core_index: CoreIndex,
664	node_features: NodeFeatures,
665) -> Result<(), Error> {
666	let (tx, rx) = oneshot::channel();
667	// Important: the `av-store` subsystem will check if the erasure root of the `available_data`
668	// matches `expected_erasure_root` which was provided by the collator in the `CandidateReceipt`.
669	// This check is consensus critical and the `backing` subsystem relies on it for ensuring
670	// candidate validity.
671	sender
672		.send_message(AvailabilityStoreMessage::StoreAvailableData {
673			candidate_hash,
674			n_validators,
675			available_data,
676			expected_erasure_root,
677			core_index,
678			node_features,
679			tx,
680		})
681		.await;
682
683	rx.await
684		.map_err(Error::StoreAvailableDataChannel)?
685		.map_err(Error::StoreAvailableData)
686}
687
688// Make a `PoV` available.
689//
690// This calls the AV store to write the available data to storage. The AV store also checks the
691// erasure root matches the `expected_erasure_root`.
692// This returns `Err()` on erasure root mismatch or due to any AV store subsystem error.
693//
694// Otherwise, it returns `Ok(())`.
695async fn make_pov_available(
696	sender: &mut impl overseer::CandidateBackingSenderTrait,
697	n_validators: usize,
698	pov: Arc<PoV>,
699	candidate_hash: CandidateHash,
700	validation_data: PersistedValidationData,
701	expected_erasure_root: Hash,
702	core_index: CoreIndex,
703	node_features: NodeFeatures,
704) -> Result<(), Error> {
705	store_available_data(
706		sender,
707		n_validators as u32,
708		candidate_hash,
709		AvailableData { pov, validation_data },
710		expected_erasure_root,
711		core_index,
712		node_features,
713	)
714	.await
715}
716
717async fn request_pov(
718	sender: &mut impl overseer::CandidateBackingSenderTrait,
719	relay_parent: Hash,
720	from_validator: ValidatorIndex,
721	para_id: ParaId,
722	candidate_hash: CandidateHash,
723	pov_hash: Hash,
724) -> Result<Arc<PoV>, Error> {
725	let (tx, rx) = oneshot::channel();
726	sender
727		.send_message(AvailabilityDistributionMessage::FetchPoV {
728			relay_parent,
729			from_validator,
730			para_id,
731			candidate_hash,
732			pov_hash,
733			tx,
734		})
735		.await;
736
737	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
738	Ok(Arc::new(pov))
739}
740
741async fn request_candidate_validation(
742	sender: &mut impl overseer::CandidateBackingSenderTrait,
743	validation_data: PersistedValidationData,
744	validation_code: ValidationCode,
745	candidate_receipt: CandidateReceipt,
746	pov: Arc<PoV>,
747	executor_params: ExecutorParams,
748) -> Result<ValidationResult, Error> {
749	let (tx, rx) = oneshot::channel();
750	let is_system = candidate_receipt.descriptor.para_id().is_system();
751	let relay_parent = candidate_receipt.descriptor.relay_parent();
752
753	sender
754		.send_message(CandidateValidationMessage::ValidateFromExhaustive {
755			validation_data,
756			validation_code,
757			candidate_receipt,
758			pov,
759			executor_params,
760			exec_kind: if is_system {
761				PvfExecKind::BackingSystemParas(relay_parent)
762			} else {
763				PvfExecKind::Backing(relay_parent)
764			},
765			response_sender: tx,
766		})
767		.await;
768
769	match rx.await {
770		Ok(Ok(validation_result)) => Ok(validation_result),
771		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
772		Err(err) => Err(Error::ValidateFromExhaustive(err)),
773	}
774}
775
776struct BackgroundValidationOutputs {
777	candidate: CandidateReceipt,
778	commitments: CandidateCommitments,
779	persisted_validation_data: PersistedValidationData,
780}
781
782type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
783
784struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
785	sender: S,
786	tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
787	candidate: CandidateReceipt,
788	relay_parent: Hash,
789	node_features: NodeFeatures,
790	executor_params: Arc<ExecutorParams>,
791	persisted_validation_data: PersistedValidationData,
792	pov: PoVData,
793	n_validators: usize,
794	make_command: F,
795}
796
797async fn validate_and_make_available(
798	params: BackgroundValidationParams<
799		impl overseer::CandidateBackingSenderTrait,
800		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
801	>,
802	core_index: CoreIndex,
803) -> Result<(), Error> {
804	let BackgroundValidationParams {
805		mut sender,
806		mut tx_command,
807		candidate,
808		relay_parent,
809		node_features,
810		executor_params,
811		persisted_validation_data,
812		pov,
813		n_validators,
814		make_command,
815	} = params;
816
817	let validation_code = {
818		let validation_code_hash = candidate.descriptor().validation_code_hash();
819		let (tx, rx) = oneshot::channel();
820		sender
821			.send_message(RuntimeApiMessage::Request(
822				relay_parent,
823				RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
824			))
825			.await;
826
827		let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
828		match code {
829			Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
830			Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
831			Ok(Some(c)) => c,
832		}
833	};
834
835	let pov = match pov {
836		PoVData::Ready(pov) => pov,
837		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
838			match request_pov(
839				&mut sender,
840				relay_parent,
841				from_validator,
842				candidate.descriptor.para_id(),
843				candidate_hash,
844				pov_hash,
845			)
846			.await
847			{
848				Err(Error::FetchPoV) => {
849					tx_command
850						.send((
851							relay_parent,
852							ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
853						))
854						.await
855						.map_err(Error::BackgroundValidationMpsc)?;
856					return Ok(())
857				},
858				Err(err) => return Err(err),
859				Ok(pov) => pov,
860			},
861	};
862
863	let v = {
864		request_candidate_validation(
865			&mut sender,
866			persisted_validation_data,
867			validation_code,
868			candidate.clone(),
869			pov.clone(),
870			executor_params.as_ref().clone(),
871		)
872		.await?
873	};
874
875	let res = match v {
876		ValidationResult::Valid(commitments, validation_data) => {
877			gum::debug!(
878				target: LOG_TARGET,
879				candidate_hash = ?candidate.hash(),
880				"Validation successful",
881			);
882
883			let erasure_valid = make_pov_available(
884				&mut sender,
885				n_validators,
886				pov.clone(),
887				candidate.hash(),
888				validation_data.clone(),
889				candidate.descriptor.erasure_root(),
890				core_index,
891				node_features,
892			)
893			.await;
894
895			match erasure_valid {
896				Ok(()) => Ok(BackgroundValidationOutputs {
897					candidate,
898					commitments,
899					persisted_validation_data: validation_data,
900				}),
901				Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
902					gum::debug!(
903						target: LOG_TARGET,
904						candidate_hash = ?candidate.hash(),
905						actual_commitments = ?commitments,
906						"Erasure root doesn't match the announced by the candidate receipt",
907					);
908					Err(candidate)
909				},
910				// Bubble up any other error.
911				Err(e) => return Err(e),
912			}
913		},
914		ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
915			// If validation produces a new set of commitments, we vote the candidate as invalid.
916			gum::warn!(
917				target: LOG_TARGET,
918				candidate_hash = ?candidate.hash(),
919				"Validation yielded different commitments",
920			);
921			Err(candidate)
922		},
923		ValidationResult::Invalid(reason) => {
924			gum::warn!(
925				target: LOG_TARGET,
926				candidate_hash = ?candidate.hash(),
927				reason = ?reason,
928				"Validation yielded an invalid candidate",
929			);
930			Err(candidate)
931		},
932	};
933
934	tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
935}
936
937#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
938async fn handle_communication<Context>(
939	ctx: &mut Context,
940	state: &mut State,
941	message: CandidateBackingMessage,
942	metrics: &Metrics,
943) -> Result<(), Error> {
944	match message {
945		CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
946			handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
947		},
948		CandidateBackingMessage::Statement(relay_parent, statement) => {
949			handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
950		},
951		CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
952			handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
953		CandidateBackingMessage::CanSecond(request, tx) =>
954			handle_can_second_request(ctx, state, request, tx).await,
955	}
956
957	Ok(())
958}
959
960#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
961async fn handle_active_leaves_update<Context>(
962	ctx: &mut Context,
963	update: ActiveLeavesUpdate,
964	state: &mut State,
965) -> Result<(), Error> {
966	// Activate in implicit view before deactivate, per the docs
967	// on ImplicitView, this is more efficient.
968	let res = if let Some(leaf) = update.activated {
969		let leaf_hash = leaf.hash;
970		Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
971	} else {
972		None
973	};
974
975	for deactivated in update.deactivated {
976		state.implicit_view.deactivate_leaf(deactivated);
977	}
978
979	// clean up `per_relay_parent` according to ancestry
980	// of leaves. we do this so we can clean up candidates right after
981	// as a result.
982	{
983		let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
984
985		state.per_relay_parent.retain(|r, _| remaining.contains(&r));
986	}
987
988	// clean up `per_candidate` according to which relay-parents
989	// are known.
990	state
991		.per_candidate
992		.retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
993
994	// Get relay parents which might be fresh but might be known already
995	// that are explicit or implicit from the new active leaf.
996	let fresh_relay_parents = match res {
997		None => return Ok(()),
998		Some((leaf, Ok(_))) => {
999			let fresh_relay_parents =
1000				state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
1001
1002			let fresh_relay_parent = match fresh_relay_parents {
1003				Some(f) => f.to_vec(),
1004				None => {
1005					gum::warn!(
1006						target: LOG_TARGET,
1007						leaf_hash = ?leaf.hash,
1008						"Implicit view gave no relay-parents"
1009					);
1010
1011					vec![leaf.hash]
1012				},
1013			};
1014			fresh_relay_parent
1015		},
1016		Some((leaf, Err(e))) => {
1017			gum::debug!(
1018				target: LOG_TARGET,
1019				leaf_hash = ?leaf.hash,
1020				err = ?e,
1021				"Failed to load implicit view for leaf."
1022			);
1023
1024			return Ok(())
1025		},
1026	};
1027
1028	// add entries in `per_relay_parent`. for all new relay-parents.
1029	for maybe_new in fresh_relay_parents {
1030		if state.per_relay_parent.contains_key(&maybe_new) {
1031			continue
1032		}
1033
1034		// construct a `PerRelayParent` from the runtime API
1035		// and insert it.
1036		let per = construct_per_relay_parent_state(
1037			ctx,
1038			maybe_new,
1039			&state.keystore,
1040			&mut state.per_session_cache,
1041		)
1042		.await?;
1043
1044		if let Some(per) = per {
1045			state.per_relay_parent.insert(maybe_new, per);
1046		}
1047	}
1048
1049	Ok(())
1050}
1051
1052macro_rules! try_runtime_api {
1053	($x: expr) => {
1054		match $x {
1055			Ok(x) => x,
1056			Err(err) => {
1057				// Only bubble up fatal errors.
1058				error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1059
1060				// We can't do candidate validation work if we don't have the
1061				// requisite runtime API data. But these errors should not take
1062				// down the node.
1063				return Ok(None)
1064			},
1065		}
1066	};
1067}
1068
1069fn core_index_from_statement(
1070	validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1071	group_rotation_info: &GroupRotationInfo,
1072	n_cores: u32,
1073	claim_queue: &ClaimQueueSnapshot,
1074	statement: &SignedFullStatementWithPVD,
1075) -> Option<CoreIndex> {
1076	let compact_statement = statement.as_unchecked();
1077	let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1078
1079	gum::trace!(
1080		target:LOG_TARGET,
1081		?group_rotation_info,
1082		?statement,
1083		?validator_to_group,
1084		n_cores,
1085		?candidate_hash,
1086		"Extracting core index from statement"
1087	);
1088
1089	let statement_validator_index = statement.validator_index();
1090	let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1091		gum::debug!(
1092			target: LOG_TARGET,
1093			?group_rotation_info,
1094			?statement,
1095			?validator_to_group,
1096			n_cores,
1097			?candidate_hash,
1098			"Invalid validator index: {:?}",
1099			statement_validator_index
1100		);
1101		return None
1102	};
1103
1104	// First check if the statement para id matches the core assignment.
1105	let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1106
1107	if core_index.0 > n_cores {
1108		gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1109		return None
1110	}
1111
1112	if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1113		let candidate_para_id = candidate.descriptor.para_id();
1114		let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1115
1116		if !assigned_paras.any(|id| id == &candidate_para_id) {
1117			gum::debug!(
1118				target: LOG_TARGET,
1119				?candidate_hash,
1120				?core_index,
1121				assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1122				?candidate_para_id,
1123				"Invalid CoreIndex, core is not assigned to this para_id"
1124			);
1125			return None
1126		}
1127		return Some(core_index)
1128	} else {
1129		return Some(core_index)
1130	}
1131}
1132
1133/// Load the data necessary to do backing work on top of a relay-parent.
1134#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1135async fn construct_per_relay_parent_state<Context>(
1136	ctx: &mut Context,
1137	relay_parent: Hash,
1138	keystore: &KeystorePtr,
1139	per_session_cache: &mut PerSessionCache,
1140) -> Result<Option<PerRelayParentState>, Error> {
1141	let parent = relay_parent;
1142
1143	let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1144		request_session_index_for_child(parent, ctx.sender()).await,
1145		request_validator_groups(parent, ctx.sender()).await,
1146		request_claim_queue(parent, ctx.sender()).await,
1147		request_disabled_validators(parent, ctx.sender()).await,
1148	)
1149	.map_err(Error::JoinMultiple)?;
1150
1151	let session_index = try_runtime_api!(session_index);
1152
1153	let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1154	let validators = try_runtime_api!(validators);
1155
1156	let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1157	let node_features = try_runtime_api!(node_features);
1158
1159	let executor_params =
1160		per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
1161	let executor_params = try_runtime_api!(executor_params);
1162
1163	gum::debug!(target: LOG_TARGET, ?parent, "New state");
1164
1165	let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1166
1167	let minimum_backing_votes = per_session_cache
1168		.minimum_backing_votes(session_index, parent, ctx.sender())
1169		.await;
1170	let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1171	let claim_queue = try_runtime_api!(claim_queue);
1172	let disabled_validators = try_runtime_api!(disabled_validators);
1173
1174	let signing_context = SigningContext { parent_hash: parent, session_index };
1175	let validator = match Validator::construct(
1176		&validators,
1177		&disabled_validators,
1178		signing_context.clone(),
1179		keystore.clone(),
1180	) {
1181		Ok(v) => Some(v),
1182		Err(util::Error::NotAValidator) => None,
1183		Err(e) => {
1184			gum::warn!(
1185				target: LOG_TARGET,
1186				err = ?e,
1187				"Cannot participate in candidate backing",
1188			);
1189
1190			return Ok(None)
1191		},
1192	};
1193
1194	let n_cores = validator_groups.len();
1195
1196	let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1197	let mut assigned_core = None;
1198
1199	for idx in 0..n_cores {
1200		let core_index = CoreIndex(idx as _);
1201
1202		if !claim_queue.contains_key(&core_index) {
1203			continue
1204		}
1205
1206		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1207		if let Some(g) = validator_groups.get(group_index.0 as usize) {
1208			if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1209				assigned_core = Some(core_index);
1210			}
1211			groups.insert(core_index, g.clone());
1212		}
1213	}
1214	gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1215
1216	let validator_to_group =
1217		per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1218
1219	let table_context =
1220		TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1221
1222	Ok(Some(PerRelayParentState {
1223		parent,
1224		node_features,
1225		executor_params,
1226		assigned_core,
1227		backed: HashSet::new(),
1228		table: Table::new(),
1229		table_context,
1230		issued_statements: HashSet::new(),
1231		awaiting_validation: HashSet::new(),
1232		fallbacks: HashMap::new(),
1233		minimum_backing_votes,
1234		n_cores: validator_groups.len() as u32,
1235		claim_queue: ClaimQueueSnapshot::from(claim_queue),
1236		validator_to_group,
1237		group_rotation_info,
1238	}))
1239}
1240
1241enum SecondingAllowed {
1242	No,
1243	// On which leaves is seconding allowed.
1244	Yes(Vec<Hash>),
1245}
1246
1247/// Checks whether a candidate can be seconded based on its hypothetical membership in the fragment
1248/// chain.
1249#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1250async fn seconding_sanity_check<Context>(
1251	ctx: &mut Context,
1252	implicit_view: &ImplicitView,
1253	hypothetical_candidate: HypotheticalCandidate,
1254) -> SecondingAllowed {
1255	let mut leaves_for_seconding = Vec::new();
1256	let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1257
1258	let candidate_para = hypothetical_candidate.candidate_para();
1259	let candidate_relay_parent = hypothetical_candidate.relay_parent();
1260	let candidate_hash = hypothetical_candidate.candidate_hash();
1261
1262	for head in implicit_view.leaves() {
1263		// Check that the candidate relay parent is allowed for para, skip the
1264		// leaf otherwise.
1265		let allowed_parents_for_para =
1266			implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1267		if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1268			continue
1269		}
1270
1271		let (tx, rx) = oneshot::channel();
1272		ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1273			HypotheticalMembershipRequest {
1274				candidates: vec![hypothetical_candidate.clone()],
1275				fragment_chain_relay_parent: Some(*head),
1276			},
1277			tx,
1278		))
1279		.await;
1280		let response = rx.map_ok(move |candidate_memberships| {
1281			let is_member_or_potential = candidate_memberships
1282				.into_iter()
1283				.find_map(|(candidate, leaves)| {
1284					(candidate.candidate_hash() == candidate_hash).then_some(leaves)
1285				})
1286				.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1287				.is_some();
1288
1289			(is_member_or_potential, head)
1290		});
1291		responses.push_back(response.boxed());
1292	}
1293
1294	if responses.is_empty() {
1295		return SecondingAllowed::No
1296	}
1297
1298	while let Some(response) = responses.next().await {
1299		match response {
1300			Err(oneshot::Canceled) => {
1301				gum::warn!(
1302					target: LOG_TARGET,
1303					"Failed to reach prospective parachains subsystem for hypothetical membership",
1304				);
1305
1306				return SecondingAllowed::No
1307			},
1308			Ok((is_member_or_potential, head)) => match is_member_or_potential {
1309				false => {
1310					gum::debug!(
1311						target: LOG_TARGET,
1312						?candidate_hash,
1313						leaf_hash = ?head,
1314						"Refusing to second candidate at leaf. Is not a potential member.",
1315					);
1316				},
1317				true => {
1318					leaves_for_seconding.push(*head);
1319				},
1320			},
1321		}
1322	}
1323
1324	if leaves_for_seconding.is_empty() {
1325		SecondingAllowed::No
1326	} else {
1327		SecondingAllowed::Yes(leaves_for_seconding)
1328	}
1329}
1330
1331/// Performs seconding sanity check for an advertisement.
1332#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1333async fn handle_can_second_request<Context>(
1334	ctx: &mut Context,
1335	state: &State,
1336	request: CanSecondRequest,
1337	tx: oneshot::Sender<bool>,
1338) {
1339	let relay_parent = request.candidate_relay_parent;
1340	let response = if state.per_relay_parent.get(&relay_parent).is_some() {
1341		let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1342			candidate_hash: request.candidate_hash,
1343			candidate_para: request.candidate_para_id,
1344			parent_head_data_hash: request.parent_head_data_hash,
1345			candidate_relay_parent: relay_parent,
1346		};
1347
1348		let result =
1349			seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1350
1351		match result {
1352			SecondingAllowed::No => false,
1353			SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1354		}
1355	} else {
1356		// Relay parent is unknown or async backing is disabled.
1357		false
1358	};
1359
1360	let _ = tx.send(response);
1361}
1362
1363#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1364async fn handle_validated_candidate_command<Context>(
1365	ctx: &mut Context,
1366	state: &mut State,
1367	relay_parent: Hash,
1368	command: ValidatedCandidateCommand,
1369	metrics: &Metrics,
1370) -> Result<(), Error> {
1371	match state.per_relay_parent.get_mut(&relay_parent) {
1372		Some(rp_state) => {
1373			let candidate_hash = command.candidate_hash();
1374			rp_state.awaiting_validation.remove(&candidate_hash);
1375
1376			match command {
1377				ValidatedCandidateCommand::Second(res) => match res {
1378					Ok(outputs) => {
1379						let BackgroundValidationOutputs {
1380							candidate,
1381							commitments,
1382							persisted_validation_data,
1383						} = outputs;
1384
1385						if rp_state.issued_statements.contains(&candidate_hash) {
1386							return Ok(())
1387						}
1388
1389						let receipt = CommittedCandidateReceipt {
1390							descriptor: candidate.descriptor.clone(),
1391							commitments,
1392						};
1393
1394						let hypothetical_candidate = HypotheticalCandidate::Complete {
1395							candidate_hash,
1396							receipt: Arc::new(receipt.clone()),
1397							persisted_validation_data: persisted_validation_data.clone(),
1398						};
1399						// sanity check that we're allowed to second the candidate
1400						// and that it doesn't conflict with other candidates we've
1401						// seconded.
1402						if let SecondingAllowed::No = seconding_sanity_check(
1403							ctx,
1404							&state.implicit_view,
1405							hypothetical_candidate,
1406						)
1407						.await
1408						{
1409							return Ok(())
1410						};
1411
1412						let statement =
1413							StatementWithPVD::Seconded(receipt, persisted_validation_data);
1414
1415						// If we get an Error::RejectedByProspectiveParachains,
1416						// then the statement has not been distributed or imported into
1417						// the table.
1418						let res = sign_import_and_distribute_statement(
1419							ctx,
1420							rp_state,
1421							&mut state.per_candidate,
1422							statement,
1423							state.keystore.clone(),
1424							metrics,
1425						)
1426						.await;
1427
1428						if let Err(Error::RejectedByProspectiveParachains) = res {
1429							let candidate_hash = candidate.hash();
1430							gum::debug!(
1431								target: LOG_TARGET,
1432								relay_parent = ?candidate.descriptor().relay_parent(),
1433								?candidate_hash,
1434								"Attempted to second candidate but was rejected by prospective parachains",
1435							);
1436
1437							// Ensure the collator is reported.
1438							ctx.send_message(CollatorProtocolMessage::Invalid(
1439								candidate.descriptor().relay_parent(),
1440								candidate,
1441							))
1442							.await;
1443
1444							return Ok(())
1445						}
1446
1447						if let Some(stmt) = res? {
1448							match state.per_candidate.get_mut(&candidate_hash) {
1449								None => {
1450									gum::warn!(
1451										target: LOG_TARGET,
1452										?candidate_hash,
1453										"Missing `per_candidate` for seconded candidate.",
1454									);
1455								},
1456								Some(p) => p.seconded_locally = true,
1457							}
1458
1459							rp_state.issued_statements.insert(candidate_hash);
1460
1461							metrics.on_candidate_seconded();
1462							ctx.send_message(CollatorProtocolMessage::Seconded(
1463								rp_state.parent,
1464								StatementWithPVD::drop_pvd_from_signed(stmt),
1465							))
1466							.await;
1467						}
1468					},
1469					Err(candidate) => {
1470						ctx.send_message(CollatorProtocolMessage::Invalid(
1471							rp_state.parent,
1472							candidate,
1473						))
1474						.await;
1475					},
1476				},
1477				ValidatedCandidateCommand::Attest(res) => {
1478					// We are done - avoid new validation spawns:
1479					rp_state.fallbacks.remove(&candidate_hash);
1480					// sanity check.
1481					if !rp_state.issued_statements.contains(&candidate_hash) {
1482						if res.is_ok() {
1483							let statement = StatementWithPVD::Valid(candidate_hash);
1484
1485							sign_import_and_distribute_statement(
1486								ctx,
1487								rp_state,
1488								&mut state.per_candidate,
1489								statement,
1490								state.keystore.clone(),
1491								metrics,
1492							)
1493							.await?;
1494						}
1495						rp_state.issued_statements.insert(candidate_hash);
1496					}
1497				},
1498				ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1499					if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1500						if let Some(index) = attesting.backing.pop() {
1501							attesting.from_validator = index;
1502							let attesting = attesting.clone();
1503
1504							// The candidate state should be available because we've
1505							// validated it before, the relay-parent is still around,
1506							// and candidates are pruned on the basis of relay-parents.
1507							//
1508							// If it's not, then no point in validating it anyway.
1509							if let Some(pvd) = state
1510								.per_candidate
1511								.get(&candidate_hash)
1512								.map(|pc| pc.persisted_validation_data.clone())
1513							{
1514								kick_off_validation_work(
1515									ctx,
1516									rp_state,
1517									pvd,
1518									&state.background_validation_tx,
1519									attesting,
1520								)
1521								.await?;
1522							}
1523						}
1524					} else {
1525						gum::warn!(
1526							target: LOG_TARGET,
1527							"AttestNoPoV was triggered without fallback being available."
1528						);
1529						debug_assert!(false);
1530					}
1531				},
1532			}
1533		},
1534		None => {
1535			// simple race condition; can be ignored = this relay-parent
1536			// is no longer relevant.
1537		},
1538	}
1539
1540	Ok(())
1541}
1542
1543fn sign_statement(
1544	rp_state: &PerRelayParentState,
1545	statement: StatementWithPVD,
1546	keystore: KeystorePtr,
1547	metrics: &Metrics,
1548) -> Option<SignedFullStatementWithPVD> {
1549	let signed = rp_state
1550		.table_context
1551		.validator
1552		.as_ref()?
1553		.sign(keystore, statement)
1554		.ok()
1555		.flatten()?;
1556	metrics.on_statement_signed();
1557	Some(signed)
1558}
1559
1560/// Import a statement into the statement table and return the summary of the import.
1561///
1562/// This will fail with `Error::RejectedByProspectiveParachains` if the message type is seconded,
1563/// the candidate is fresh, and any of the following are true:
1564/// 1. There is no `PersistedValidationData` attached.
1565/// 2. Prospective parachains subsystem returned an empty `HypotheticalMembership` i.e. did not
1566///    recognize the candidate as being applicable to any of the active leaves.
1567#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1568async fn import_statement<Context>(
1569	ctx: &mut Context,
1570	rp_state: &mut PerRelayParentState,
1571	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1572	statement: &SignedFullStatementWithPVD,
1573) -> Result<Option<TableSummary>, Error> {
1574	let candidate_hash = statement.payload().candidate_hash();
1575
1576	gum::debug!(
1577		target: LOG_TARGET,
1578		statement = ?statement.payload().to_compact(),
1579		validator_index = statement.validator_index().0,
1580		?candidate_hash,
1581		"Importing statement",
1582	);
1583
1584	// If this is a new candidate (statement is 'seconded' and candidate is unknown),
1585	// we need to create an entry in the `PerCandidateState` map.
1586	//
1587	// We also need to inform the prospective parachains subsystem of the seconded candidate.
1588	// If `ProspectiveParachainsMessage::Second` fails, then we return
1589	// Error::RejectedByProspectiveParachains.
1590	//
1591	// Persisted Validation Data should be available - it may already be available
1592	// if this is a candidate we are seconding.
1593	//
1594	// We should also not accept any candidates which have no valid depths under any of
1595	// our active leaves.
1596	if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1597		if !per_candidate.contains_key(&candidate_hash) {
1598			let (tx, rx) = oneshot::channel();
1599			ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1600				IntroduceSecondedCandidateRequest {
1601					candidate_para: candidate.descriptor.para_id(),
1602					candidate_receipt: candidate.clone(),
1603					persisted_validation_data: pvd.clone(),
1604				},
1605				tx,
1606			))
1607			.await;
1608
1609			match rx.await {
1610				Err(oneshot::Canceled) => {
1611					gum::warn!(
1612						target: LOG_TARGET,
1613						"Could not reach the Prospective Parachains subsystem."
1614					);
1615
1616					return Err(Error::RejectedByProspectiveParachains)
1617				},
1618				Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1619				Ok(true) => {},
1620			}
1621
1622			// Only save the candidate if it was approved by prospective parachains.
1623			per_candidate.insert(
1624				candidate_hash,
1625				PerCandidateState {
1626					persisted_validation_data: pvd.clone(),
1627					// This is set after importing when seconding locally.
1628					seconded_locally: false,
1629					relay_parent: candidate.descriptor.relay_parent(),
1630				},
1631			);
1632		}
1633	}
1634
1635	let stmt = primitive_statement_to_table(statement);
1636
1637	let core = core_index_from_statement(
1638		&rp_state.validator_to_group,
1639		&rp_state.group_rotation_info,
1640		rp_state.n_cores,
1641		&rp_state.claim_queue,
1642		statement,
1643	)
1644	.ok_or(Error::CoreIndexUnavailable)?;
1645
1646	Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1647}
1648
1649/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
1650/// misbehaviors as a result of importing a statement.
1651#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1652async fn post_import_statement_actions<Context>(
1653	ctx: &mut Context,
1654	rp_state: &mut PerRelayParentState,
1655	summary: Option<&TableSummary>,
1656) {
1657	if let Some(attested) = summary.as_ref().and_then(|s| {
1658		rp_state.table.attested_candidate(
1659			&s.candidate,
1660			&rp_state.table_context,
1661			rp_state.minimum_backing_votes,
1662		)
1663	}) {
1664		let candidate_hash = attested.candidate.hash();
1665
1666		// `HashSet::insert` returns true if the thing wasn't in there already.
1667		if rp_state.backed.insert(candidate_hash) {
1668			if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
1669				let para_id = backed.candidate().descriptor.para_id();
1670				gum::debug!(
1671					target: LOG_TARGET,
1672					candidate_hash = ?candidate_hash,
1673					relay_parent = ?rp_state.parent,
1674					%para_id,
1675					"Candidate backed",
1676				);
1677
1678				// Inform the prospective parachains subsystem
1679				// that the candidate is now backed.
1680				ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1681					para_id,
1682					candidate_hash,
1683				))
1684				.await;
1685				// Notify statement distribution of backed candidate.
1686				ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1687			} else {
1688				gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1689			}
1690		} else {
1691			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1692		}
1693	} else {
1694		gum::debug!(target: LOG_TARGET, "No attested candidate");
1695	}
1696
1697	issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1698}
1699
1700/// Check if there have happened any new misbehaviors and issue necessary messages.
1701#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1702fn issue_new_misbehaviors<Context>(
1703	ctx: &mut Context,
1704	relay_parent: Hash,
1705	table: &mut Table<TableContext>,
1706) {
1707	// collect the misbehaviors to avoid double mutable self borrow issues
1708	let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1709	for (validator_id, report) in misbehaviors {
1710		// The provisioner waits on candidate-backing, which means
1711		// that we need to send unbounded messages to avoid cycles.
1712		//
1713		// Misbehaviors are bounded by the number of validators and
1714		// the block production protocol.
1715		ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1716			relay_parent,
1717			ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1718		));
1719	}
1720}
1721
1722/// Sign, import, and distribute a statement.
1723#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1724async fn sign_import_and_distribute_statement<Context>(
1725	ctx: &mut Context,
1726	rp_state: &mut PerRelayParentState,
1727	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1728	statement: StatementWithPVD,
1729	keystore: KeystorePtr,
1730	metrics: &Metrics,
1731) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1732	if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1733		let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1734
1735		// `Share` must always be sent before `Backed`. We send the latter in
1736		// `post_import_statement_action` below.
1737		let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1738		ctx.send_unbounded_message(smsg);
1739
1740		post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1741
1742		Ok(Some(signed_statement))
1743	} else {
1744		Ok(None)
1745	}
1746}
1747
1748#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1749async fn background_validate_and_make_available<Context>(
1750	ctx: &mut Context,
1751	rp_state: &mut PerRelayParentState,
1752	params: BackgroundValidationParams<
1753		impl overseer::CandidateBackingSenderTrait,
1754		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1755	>,
1756) -> Result<(), Error> {
1757	let candidate_hash = params.candidate.hash();
1758	let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1759	if rp_state.awaiting_validation.insert(candidate_hash) {
1760		// spawn background task.
1761		let bg = async move {
1762			if let Err(error) = validate_and_make_available(params, core_index).await {
1763				if let Error::BackgroundValidationMpsc(error) = error {
1764					gum::debug!(
1765						target: LOG_TARGET,
1766						?candidate_hash,
1767						?error,
1768						"Mpsc background validation mpsc died during validation- leaf no longer active?"
1769					);
1770				} else {
1771					gum::error!(
1772						target: LOG_TARGET,
1773						?candidate_hash,
1774						?error,
1775						"Failed to validate and make available",
1776					);
1777				}
1778			}
1779		};
1780
1781		ctx.spawn("backing-validation", bg.boxed())
1782			.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1783	}
1784
1785	Ok(())
1786}
1787
1788/// Kick off validation work and distribute the result as a signed statement.
1789#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1790async fn kick_off_validation_work<Context>(
1791	ctx: &mut Context,
1792	rp_state: &mut PerRelayParentState,
1793	persisted_validation_data: PersistedValidationData,
1794	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1795	attesting: AttestingData,
1796) -> Result<(), Error> {
1797	// Do nothing if the local validator is disabled or not a validator at all
1798	match rp_state.table_context.local_validator_is_disabled() {
1799		Some(true) => {
1800			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1801			return Ok(())
1802		},
1803		Some(false) => {}, // we are not disabled - move on
1804		None => {
1805			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1806			return Ok(())
1807		},
1808	}
1809
1810	let candidate_hash = attesting.candidate.hash();
1811	if rp_state.issued_statements.contains(&candidate_hash) {
1812		return Ok(())
1813	}
1814
1815	gum::debug!(
1816		target: LOG_TARGET,
1817		candidate_hash = ?candidate_hash,
1818		candidate_receipt = ?attesting.candidate,
1819		"Kicking off validation",
1820	);
1821
1822	let bg_sender = ctx.sender().clone();
1823	let pov = PoVData::FetchFromValidator {
1824		from_validator: attesting.from_validator,
1825		candidate_hash,
1826		pov_hash: attesting.pov_hash,
1827	};
1828
1829	background_validate_and_make_available(
1830		ctx,
1831		rp_state,
1832		BackgroundValidationParams {
1833			sender: bg_sender,
1834			tx_command: background_validation_tx.clone(),
1835			candidate: attesting.candidate,
1836			relay_parent: rp_state.parent,
1837			node_features: rp_state.node_features.clone(),
1838			executor_params: Arc::clone(&rp_state.executor_params),
1839			persisted_validation_data,
1840			pov,
1841			n_validators: rp_state.table_context.validators.len(),
1842			make_command: ValidatedCandidateCommand::Attest,
1843		},
1844	)
1845	.await
1846}
1847
1848/// Import the statement and kick off validation work if it is a part of our assignment.
1849#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1850async fn maybe_validate_and_import<Context>(
1851	ctx: &mut Context,
1852	state: &mut State,
1853	relay_parent: Hash,
1854	statement: SignedFullStatementWithPVD,
1855) -> Result<(), Error> {
1856	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1857		Some(r) => r,
1858		None => {
1859			gum::trace!(
1860				target: LOG_TARGET,
1861				?relay_parent,
1862				"Received statement for unknown relay-parent"
1863			);
1864
1865			return Ok(())
1866		},
1867	};
1868
1869	// Don't import statement if the sender is disabled
1870	if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1871		gum::debug!(
1872			target: LOG_TARGET,
1873			sender_validator_idx = ?statement.validator_index(),
1874			"Not importing statement because the sender is disabled"
1875		);
1876		return Ok(())
1877	}
1878
1879	let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1880
1881	// if we get an Error::RejectedByProspectiveParachains,
1882	// we will do nothing.
1883	if let Err(Error::RejectedByProspectiveParachains) = res {
1884		gum::debug!(
1885			target: LOG_TARGET,
1886			?relay_parent,
1887			"Statement rejected by prospective parachains."
1888		);
1889
1890		return Ok(())
1891	}
1892
1893	let summary = res?;
1894	post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1895
1896	if let Some(summary) = summary {
1897		// import_statement already takes care of communicating with the
1898		// prospective parachains subsystem. At this point, the candidate
1899		// has already been accepted by the subsystem.
1900
1901		let candidate_hash = summary.candidate;
1902
1903		if Some(summary.group_id) != rp_state.assigned_core {
1904			return Ok(())
1905		}
1906
1907		let attesting = match statement.payload() {
1908			StatementWithPVD::Seconded(receipt, _) => {
1909				let attesting = AttestingData {
1910					candidate: rp_state
1911						.table
1912						.get_candidate(&candidate_hash)
1913						.ok_or(Error::CandidateNotFound)?
1914						.to_plain(),
1915					pov_hash: receipt.descriptor.pov_hash(),
1916					from_validator: statement.validator_index(),
1917					backing: Vec::new(),
1918				};
1919				rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1920				attesting
1921			},
1922			StatementWithPVD::Valid(candidate_hash) => {
1923				if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1924					let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1925					if our_index == Some(statement.validator_index()) {
1926						return Ok(())
1927					}
1928
1929					if rp_state.awaiting_validation.contains(candidate_hash) {
1930						// Job already running:
1931						attesting.backing.push(statement.validator_index());
1932						return Ok(())
1933					} else {
1934						// No job, so start another with current validator:
1935						attesting.from_validator = statement.validator_index();
1936						attesting.clone()
1937					}
1938				} else {
1939					return Ok(())
1940				}
1941			},
1942		};
1943
1944		// After `import_statement` succeeds, the candidate entry is guaranteed
1945		// to exist.
1946		if let Some(pvd) = state
1947			.per_candidate
1948			.get(&candidate_hash)
1949			.map(|pc| pc.persisted_validation_data.clone())
1950		{
1951			kick_off_validation_work(
1952				ctx,
1953				rp_state,
1954				pvd,
1955				&state.background_validation_tx,
1956				attesting,
1957			)
1958			.await?;
1959		}
1960	}
1961	Ok(())
1962}
1963
1964/// Kick off background validation with intent to second.
1965#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1966async fn validate_and_second<Context>(
1967	ctx: &mut Context,
1968	rp_state: &mut PerRelayParentState,
1969	persisted_validation_data: PersistedValidationData,
1970	candidate: &CandidateReceipt,
1971	pov: Arc<PoV>,
1972	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1973) -> Result<(), Error> {
1974	let candidate_hash = candidate.hash();
1975
1976	gum::debug!(
1977		target: LOG_TARGET,
1978		candidate_hash = ?candidate_hash,
1979		candidate_receipt = ?candidate,
1980		"Validate and second candidate",
1981	);
1982
1983	let bg_sender = ctx.sender().clone();
1984	background_validate_and_make_available(
1985		ctx,
1986		rp_state,
1987		BackgroundValidationParams {
1988			sender: bg_sender,
1989			tx_command: background_validation_tx.clone(),
1990			candidate: candidate.clone(),
1991			relay_parent: rp_state.parent,
1992			node_features: rp_state.node_features.clone(),
1993			executor_params: Arc::clone(&rp_state.executor_params),
1994			persisted_validation_data,
1995			pov: PoVData::Ready(pov),
1996			n_validators: rp_state.table_context.validators.len(),
1997			make_command: ValidatedCandidateCommand::Second,
1998		},
1999	)
2000	.await?;
2001
2002	Ok(())
2003}
2004
2005#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2006async fn handle_second_message<Context>(
2007	ctx: &mut Context,
2008	state: &mut State,
2009	candidate: CandidateReceipt,
2010	persisted_validation_data: PersistedValidationData,
2011	pov: PoV,
2012	metrics: &Metrics,
2013) -> Result<(), Error> {
2014	let _timer = metrics.time_process_second();
2015
2016	let candidate_hash = candidate.hash();
2017	let relay_parent = candidate.descriptor().relay_parent();
2018
2019	if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2020		gum::warn!(
2021			target: LOG_TARGET,
2022			?candidate_hash,
2023			"Candidate backing was asked to second candidate with wrong PVD",
2024		);
2025
2026		return Ok(())
2027	}
2028
2029	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2030		None => {
2031			gum::trace!(
2032				target: LOG_TARGET,
2033				?relay_parent,
2034				?candidate_hash,
2035				"We were asked to second a candidate outside of our view."
2036			);
2037
2038			return Ok(())
2039		},
2040		Some(r) => r,
2041	};
2042
2043	// Just return if the local validator is disabled. If we are here the local node should be a
2044	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
2045	if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2046		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2047		return Ok(())
2048	}
2049
2050	let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2051
2052	// Sanity check that candidate is from our assignment.
2053	if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2054		gum::debug!(
2055			target: LOG_TARGET,
2056			our_assignment_core = ?rp_state.assigned_core,
2057			our_assignment_paras = ?assigned_paras,
2058			collation = ?candidate.descriptor().para_id(),
2059			"Subsystem asked to second for para outside of our assignment",
2060		);
2061		return Ok(());
2062	}
2063
2064	gum::debug!(
2065		target: LOG_TARGET,
2066		our_assignment_core = ?rp_state.assigned_core,
2067		our_assignment_paras = ?assigned_paras,
2068		collation = ?candidate.descriptor().para_id(),
2069		"Current assignments vs collation",
2070	);
2071
2072	// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
2073	// Seconded statement only if we have not signed a Valid statement for the requested candidate.
2074	//
2075	// The actual logic of issuing the signed statement checks that this isn't
2076	// conflicting with other seconded candidates. Not doing that check here
2077	// gives other subsystems the ability to get us to execute arbitrary candidates,
2078	// but no more.
2079	if !rp_state.issued_statements.contains(&candidate_hash) {
2080		let pov = Arc::new(pov);
2081
2082		validate_and_second(
2083			ctx,
2084			rp_state,
2085			persisted_validation_data,
2086			&candidate,
2087			pov,
2088			&state.background_validation_tx,
2089		)
2090		.await?;
2091	}
2092
2093	Ok(())
2094}
2095
2096#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2097async fn handle_statement_message<Context>(
2098	ctx: &mut Context,
2099	state: &mut State,
2100	relay_parent: Hash,
2101	statement: SignedFullStatementWithPVD,
2102	metrics: &Metrics,
2103) -> Result<(), Error> {
2104	let _timer = metrics.time_process_statement();
2105
2106	// Validator disabling is handled in `maybe_validate_and_import`
2107	match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2108		Err(Error::ValidationFailed(_)) => Ok(()),
2109		Err(e) => Err(e),
2110		Ok(()) => Ok(()),
2111	}
2112}
2113
2114fn handle_get_backable_candidates_message(
2115	state: &State,
2116	requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2117	tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2118	metrics: &Metrics,
2119) -> Result<(), Error> {
2120	let _timer = metrics.time_get_backed_candidates();
2121
2122	let mut backed = HashMap::with_capacity(requested_candidates.len());
2123
2124	for (para_id, para_candidates) in requested_candidates {
2125		for (candidate_hash, relay_parent) in para_candidates.iter() {
2126			let rp_state = match state.per_relay_parent.get(&relay_parent) {
2127				Some(rp_state) => rp_state,
2128				None => {
2129					gum::debug!(
2130						target: LOG_TARGET,
2131						?relay_parent,
2132						?candidate_hash,
2133						"Requested candidate's relay parent is out of view",
2134					);
2135					break
2136				},
2137			};
2138			let maybe_backed_candidate = rp_state
2139				.table
2140				.attested_candidate(
2141					candidate_hash,
2142					&rp_state.table_context,
2143					rp_state.minimum_backing_votes,
2144				)
2145				.and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context));
2146
2147			if let Some(backed_candidate) = maybe_backed_candidate {
2148				backed
2149					.entry(para_id)
2150					.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2151					.push(backed_candidate);
2152			} else {
2153				break
2154			}
2155		}
2156	}
2157
2158	tx.send(backed).map_err(|data| Error::Send(data))?;
2159	Ok(())
2160}