polkadot_node_core_backing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the `CandidateBackingSubsystem`.
18//!
19//! This subsystem maintains the entire responsibility of tracking parachain
20//! candidates which can be backed, as well as the issuance of statements
21//! about candidates when run on a validator node.
22//!
23//! There are two types of statements: `Seconded` and `Valid`.
24//! `Seconded` implies `Valid`, and nothing should be stated as
25//! `Valid` unless its already been `Seconded`.
26//!
27//! Validators may only second candidates which fall under their own group
28//! assignment, and they may only second one candidate per depth per active leaf.
29//! Candidates which are stated as either `Second` or `Valid` by a majority of the
30//! assigned group of validators may be backed on-chain and proceed to the availability
31//! stage.
32//!
33//! Depth is a concept relating to asynchronous backing, by which
34//! short sub-chains of candidates are backed and extended off-chain, and then placed
35//! asynchronously into blocks of the relay chain as those are authored and as the
36//! relay-chain state becomes ready for them. Asynchronous backing allows parachains to
37//! grow mostly independently from the state of the relay chain, which gives more time for
38//! parachains to be validated and thereby increases performance.
39//!
40//! Most of the work of asynchronous backing is handled by the Prospective Parachains
41//! subsystem. The 'depth' of a parachain block with respect to a relay chain block is
42//! a measure of how many parachain blocks are between the most recent included parachain block
43//! in the post-state of the relay-chain block and the candidate. For instance,
44//! a candidate that descends directly from the most recent parachain block in the relay-chain
45//! state has depth 0. The child of that candidate would have depth 1. And so on.
46//!
47//! The candidate backing subsystem keeps track of a set of 'active leaves' which are the
48//! most recent blocks in the relay-chain (which is in fact a tree) which could be built
49//! upon. Depth is always measured against active leaves, and the valid relay-parent that
50//! each candidate can have is determined by the active leaves. The Prospective Parachains
51//! subsystem enforces that the relay-parent increases monotonically, so that logic
52//! is not handled here. By communicating with the Prospective Parachains subsystem,
53//! this subsystem extrapolates an "implicit view" from the set of currently active leaves,
54//! which determines the set of all recent relay-chain block hashes which could be relay-parents
55//! for candidates backed in children of the active leaves.
56//!
57//! In fact, this subsystem relies on the Statement Distribution subsystem to prevent spam
58//! by enforcing the rule that each validator may second at most one candidate per depth per
59//! active leaf. This bounds the number of candidates that the system needs to consider and
60//! is not handled within this subsystem, except for candidates seconded locally.
61//!
62//! This subsystem also handles relay-chain heads which don't support asynchronous backing.
63//! For such active leaves, the only valid relay-parent is the leaf hash itself and the only
64//! allowed depth is 0.
65
66#![deny(unused_crate_dependencies)]
67
68use std::{
69	collections::{HashMap, HashSet},
70	sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75	channel::{mpsc, oneshot},
76	future::BoxFuture,
77	stream::FuturesOrdered,
78	FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84	AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85	ValidationResult,
86};
87use polkadot_node_subsystem::{
88	messages::{
89		AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
90		CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
91		HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
92		ProspectiveParachainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
93		RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
94		StoreAvailableDataError,
95	},
96	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError,
97};
98use polkadot_node_subsystem_util::{
99	self as util,
100	backing_implicit_view::{FetchError as ImplicitViewFetchError, View as ImplicitView},
101	executor_params_at_relay_parent, request_from_runtime, request_session_index_for_child,
102	request_validator_groups, request_validators,
103	runtime::{
104		self, fetch_claim_queue, prospective_parachains_mode, request_min_backing_votes,
105		ClaimQueueSnapshot, ProspectiveParachainsMode,
106	},
107	Validator,
108};
109use polkadot_parachain_primitives::primitives::IsSystem;
110use polkadot_primitives::{
111	node_features::FeatureIndex,
112	vstaging::{
113		BackedCandidate, CandidateReceiptV2 as CandidateReceipt,
114		CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreState,
115	},
116	CandidateCommitments, CandidateHash, CoreIndex, ExecutorParams, GroupIndex, GroupRotationInfo,
117	Hash, Id as ParaId, IndexedVec, NodeFeatures, PersistedValidationData, SessionIndex,
118	SigningContext, ValidationCode, ValidatorId, ValidatorIndex, ValidatorSignature,
119	ValidityAttestation,
120};
121use polkadot_statement_table::{
122	generic::AttestedCandidate as TableAttestedCandidate,
123	v2::{
124		SignedStatement as TableSignedStatement, Statement as TableStatement,
125		Summary as TableSummary,
126	},
127	Config as TableConfig, Context as TableContextTrait, Table,
128};
129use sp_keystore::KeystorePtr;
130use util::runtime::{get_disabled_validators_with_fallback, request_node_features};
131
132mod error;
133
134mod metrics;
135use self::metrics::Metrics;
136
137#[cfg(test)]
138mod tests;
139
140const LOG_TARGET: &str = "parachain::candidate-backing";
141
142/// PoV data to validate.
143enum PoVData {
144	/// Already available (from candidate selection).
145	Ready(Arc<PoV>),
146	/// Needs to be fetched from validator (we are checking a signed statement).
147	FetchFromValidator {
148		from_validator: ValidatorIndex,
149		candidate_hash: CandidateHash,
150		pov_hash: Hash,
151	},
152}
153
154enum ValidatedCandidateCommand {
155	// We were instructed to second the candidate that has been already validated.
156	Second(BackgroundValidationResult),
157	// We were instructed to validate the candidate.
158	Attest(BackgroundValidationResult),
159	// We were not able to `Attest` because backing validator did not send us the PoV.
160	AttestNoPoV(CandidateHash),
161}
162
163impl std::fmt::Debug for ValidatedCandidateCommand {
164	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
165		let candidate_hash = self.candidate_hash();
166		match *self {
167			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
168			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
169			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
170		}
171	}
172}
173
174impl ValidatedCandidateCommand {
175	fn candidate_hash(&self) -> CandidateHash {
176		match *self {
177			ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
178			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
179			ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
180			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
181			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
182		}
183	}
184}
185
186/// The candidate backing subsystem.
187pub struct CandidateBackingSubsystem {
188	keystore: KeystorePtr,
189	metrics: Metrics,
190}
191
192impl CandidateBackingSubsystem {
193	/// Create a new instance of the `CandidateBackingSubsystem`.
194	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
195		Self { keystore, metrics }
196	}
197}
198
199#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
200impl<Context> CandidateBackingSubsystem
201where
202	Context: Send + Sync,
203{
204	fn start(self, ctx: Context) -> SpawnedSubsystem {
205		let future = async move {
206			run(ctx, self.keystore, self.metrics)
207				.await
208				.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
209		}
210		.boxed();
211
212		SpawnedSubsystem { name: "candidate-backing-subsystem", future }
213	}
214}
215
216struct PerRelayParentState {
217	prospective_parachains_mode: ProspectiveParachainsMode,
218	/// The hash of the relay parent on top of which this job is doing it's work.
219	parent: Hash,
220	/// Session index.
221	session_index: SessionIndex,
222	/// The `CoreIndex` assigned to the local validator at this relay parent.
223	assigned_core: Option<CoreIndex>,
224	/// The candidates that are backed by enough validators in their group, by hash.
225	backed: HashSet<CandidateHash>,
226	/// The table of candidates and statements under this relay-parent.
227	table: Table<TableContext>,
228	/// The table context, including groups.
229	table_context: TableContext,
230	/// We issued `Seconded` or `Valid` statements on about these candidates.
231	issued_statements: HashSet<CandidateHash>,
232	/// These candidates are undergoing validation in the background.
233	awaiting_validation: HashSet<CandidateHash>,
234	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
235	fallbacks: HashMap<CandidateHash, AttestingData>,
236	/// The minimum backing votes threshold.
237	minimum_backing_votes: u32,
238	/// If true, we're appending extra bits in the BackedCandidate validator indices bitfield,
239	/// which represent the assigned core index. True if ElasticScalingMVP is enabled.
240	inject_core_index: bool,
241	/// The number of cores.
242	n_cores: u32,
243	/// Claim queue state. If the runtime API is not available, it'll be populated with info from
244	/// availability cores.
245	claim_queue: ClaimQueueSnapshot,
246	/// The validator index -> group mapping at this relay parent.
247	validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
248	/// The associated group rotation information.
249	group_rotation_info: GroupRotationInfo,
250}
251
252struct PerCandidateState {
253	persisted_validation_data: PersistedValidationData,
254	seconded_locally: bool,
255	relay_parent: Hash,
256}
257
258enum ActiveLeafState {
259	// If prospective-parachains is disabled, one validator may only back one candidate per
260	// paraid.
261	ProspectiveParachainsDisabled { seconded: HashSet<ParaId> },
262	ProspectiveParachainsEnabled { max_candidate_depth: usize, allowed_ancestry_len: usize },
263}
264
265impl ActiveLeafState {
266	fn new(mode: ProspectiveParachainsMode) -> Self {
267		match mode {
268			ProspectiveParachainsMode::Disabled =>
269				Self::ProspectiveParachainsDisabled { seconded: HashSet::new() },
270			ProspectiveParachainsMode::Enabled { max_candidate_depth, allowed_ancestry_len } =>
271				Self::ProspectiveParachainsEnabled { max_candidate_depth, allowed_ancestry_len },
272		}
273	}
274
275	fn add_seconded_candidate(&mut self, para_id: ParaId) {
276		if let Self::ProspectiveParachainsDisabled { seconded } = self {
277			seconded.insert(para_id);
278		}
279	}
280}
281
282impl From<&ActiveLeafState> for ProspectiveParachainsMode {
283	fn from(state: &ActiveLeafState) -> Self {
284		match *state {
285			ActiveLeafState::ProspectiveParachainsDisabled { .. } =>
286				ProspectiveParachainsMode::Disabled,
287			ActiveLeafState::ProspectiveParachainsEnabled {
288				max_candidate_depth,
289				allowed_ancestry_len,
290			} => ProspectiveParachainsMode::Enabled { max_candidate_depth, allowed_ancestry_len },
291		}
292	}
293}
294
295/// The state of the subsystem.
296struct State {
297	/// The utility for managing the implicit and explicit views in a consistent way.
298	///
299	/// We only feed leaves which have prospective parachains enabled to this view.
300	implicit_view: ImplicitView,
301	/// State tracked for all active leaves, whether or not they have prospective parachains
302	/// enabled.
303	per_leaf: HashMap<Hash, ActiveLeafState>,
304	/// State tracked for all relay-parents backing work is ongoing for. This includes
305	/// all active leaves.
306	///
307	/// relay-parents fall into one of 3 categories.
308	///   1. active leaves which do support prospective parachains
309	///   2. active leaves which do not support prospective parachains
310	///   3. relay-chain blocks which are ancestors of an active leaf and do support prospective
311	///      parachains.
312	///
313	/// Relay-chain blocks which don't support prospective parachains are
314	/// never included in the fragment chains of active leaves which do.
315	///
316	/// While it would be technically possible to support such leaves in
317	/// fragment chains, it only benefits the transition period when asynchronous
318	/// backing is being enabled and complicates code.
319	per_relay_parent: HashMap<Hash, PerRelayParentState>,
320	/// State tracked for all candidates relevant to the implicit view.
321	///
322	/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
323	/// or explicit view for which a `Seconded` statement has been successfully imported.
324	per_candidate: HashMap<CandidateHash, PerCandidateState>,
325	/// Cache the per-session Validator->Group mapping.
326	validator_to_group_cache:
327		LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
328	/// A clonable sender which is dispatched to background candidate validation tasks to inform
329	/// the main task of the result.
330	background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
331	/// The handle to the keystore used for signing.
332	keystore: KeystorePtr,
333}
334
335impl State {
336	fn new(
337		background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
338		keystore: KeystorePtr,
339	) -> Self {
340		State {
341			implicit_view: ImplicitView::default(),
342			per_leaf: HashMap::default(),
343			per_relay_parent: HashMap::default(),
344			per_candidate: HashMap::new(),
345			validator_to_group_cache: LruMap::new(ByLength::new(2)),
346			background_validation_tx,
347			keystore,
348		}
349	}
350}
351
352#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
353async fn run<Context>(
354	mut ctx: Context,
355	keystore: KeystorePtr,
356	metrics: Metrics,
357) -> FatalResult<()> {
358	let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
359	let mut state = State::new(background_validation_tx, keystore);
360
361	loop {
362		let res =
363			run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
364
365		match res {
366			Ok(()) => break,
367			Err(e) => crate::error::log_error(Err(e))?,
368		}
369	}
370
371	Ok(())
372}
373
374#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
375async fn run_iteration<Context>(
376	ctx: &mut Context,
377	state: &mut State,
378	metrics: &Metrics,
379	background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
380) -> Result<(), Error> {
381	loop {
382		futures::select!(
383			validated_command = background_validation_rx.next().fuse() => {
384				if let Some((relay_parent, command)) = validated_command {
385					handle_validated_candidate_command(
386						&mut *ctx,
387						state,
388						relay_parent,
389						command,
390						metrics,
391					).await?;
392				} else {
393					panic!("background_validation_tx always alive at this point; qed");
394				}
395			}
396			from_overseer = ctx.recv().fuse() => {
397				match from_overseer.map_err(Error::OverseerExited)? {
398					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
399						handle_active_leaves_update(
400							&mut *ctx,
401							update,
402							state,
403						).await?;
404					}
405					FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
406					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
407					FromOrchestra::Communication { msg } => {
408						handle_communication(&mut *ctx, state, msg, metrics).await?;
409					}
410				}
411			}
412		)
413	}
414}
415
416/// In case a backing validator does not provide a PoV, we need to retry with other backing
417/// validators.
418///
419/// This is the data needed to accomplish this. Basically all the data needed for spawning a
420/// validation job and a list of backing validators, we can try.
421#[derive(Clone)]
422struct AttestingData {
423	/// The candidate to attest.
424	candidate: CandidateReceipt,
425	/// Hash of the PoV we need to fetch.
426	pov_hash: Hash,
427	/// Validator we are currently trying to get the PoV from.
428	from_validator: ValidatorIndex,
429	/// Other backing validators we can try in case `from_validator` failed.
430	backing: Vec<ValidatorIndex>,
431}
432
433#[derive(Default, Debug)]
434struct TableContext {
435	validator: Option<Validator>,
436	groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
437	validators: Vec<ValidatorId>,
438	disabled_validators: Vec<ValidatorIndex>,
439}
440
441impl TableContext {
442	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
443	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
444		self.disabled_validators
445			.iter()
446			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
447	}
448
449	// Returns `true` if the local validator is in the disabled validators list
450	pub fn local_validator_is_disabled(&self) -> Option<bool> {
451		self.validator.as_ref().map(|v| v.disabled())
452	}
453}
454
455impl TableContextTrait for TableContext {
456	type AuthorityId = ValidatorIndex;
457	type Digest = CandidateHash;
458	type GroupId = CoreIndex;
459	type Signature = ValidatorSignature;
460	type Candidate = CommittedCandidateReceipt;
461
462	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
463		candidate.hash()
464	}
465
466	fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
467		self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
468	}
469
470	fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
471		self.groups.get(group).map(|g| g.len())
472	}
473}
474
475// It looks like it's not possible to do an `impl From` given the current state of
476// the code. So this does the necessary conversion.
477fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
478	let statement = match s.payload() {
479		StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
480		StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
481	};
482
483	TableSignedStatement {
484		statement,
485		signature: s.signature().clone(),
486		sender: s.validator_index(),
487	}
488}
489
490fn table_attested_to_backed(
491	attested: TableAttestedCandidate<
492		CoreIndex,
493		CommittedCandidateReceipt,
494		ValidatorIndex,
495		ValidatorSignature,
496	>,
497	table_context: &TableContext,
498	inject_core_index: bool,
499) -> Option<BackedCandidate> {
500	let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
501
502	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
503		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
504
505	let group = table_context.groups.get(&core_index)?;
506
507	let mut validator_indices = BitVec::with_capacity(group.len());
508
509	validator_indices.resize(group.len(), false);
510
511	// The order of the validity votes in the backed candidate must match
512	// the order of bits set in the bitfield, which is not necessarily
513	// the order of the `validity_votes` we got from the table.
514	let mut vote_positions = Vec::with_capacity(validity_votes.len());
515	for (orig_idx, id) in ids.iter().enumerate() {
516		if let Some(position) = group.iter().position(|x| x == id) {
517			validator_indices.set(position, true);
518			vote_positions.push((orig_idx, position));
519		} else {
520			gum::warn!(
521				target: LOG_TARGET,
522				"Logic error: Validity vote from table does not correspond to group",
523			);
524
525			return None
526		}
527	}
528	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
529
530	Some(BackedCandidate::new(
531		candidate,
532		vote_positions
533			.into_iter()
534			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
535			.collect(),
536		validator_indices,
537		inject_core_index.then_some(core_index),
538	))
539}
540
541async fn store_available_data(
542	sender: &mut impl overseer::CandidateBackingSenderTrait,
543	n_validators: u32,
544	candidate_hash: CandidateHash,
545	available_data: AvailableData,
546	expected_erasure_root: Hash,
547	core_index: CoreIndex,
548	node_features: NodeFeatures,
549) -> Result<(), Error> {
550	let (tx, rx) = oneshot::channel();
551	// Important: the `av-store` subsystem will check if the erasure root of the `available_data`
552	// matches `expected_erasure_root` which was provided by the collator in the `CandidateReceipt`.
553	// This check is consensus critical and the `backing` subsystem relies on it for ensuring
554	// candidate validity.
555	sender
556		.send_message(AvailabilityStoreMessage::StoreAvailableData {
557			candidate_hash,
558			n_validators,
559			available_data,
560			expected_erasure_root,
561			core_index,
562			node_features,
563			tx,
564		})
565		.await;
566
567	rx.await
568		.map_err(Error::StoreAvailableDataChannel)?
569		.map_err(Error::StoreAvailableData)
570}
571
572// Make a `PoV` available.
573//
574// This calls the AV store to write the available data to storage. The AV store also checks the
575// erasure root matches the `expected_erasure_root`.
576// This returns `Err()` on erasure root mismatch or due to any AV store subsystem error.
577//
578// Otherwise, it returns `Ok(())`.
579async fn make_pov_available(
580	sender: &mut impl overseer::CandidateBackingSenderTrait,
581	n_validators: usize,
582	pov: Arc<PoV>,
583	candidate_hash: CandidateHash,
584	validation_data: PersistedValidationData,
585	expected_erasure_root: Hash,
586	core_index: CoreIndex,
587	node_features: NodeFeatures,
588) -> Result<(), Error> {
589	store_available_data(
590		sender,
591		n_validators as u32,
592		candidate_hash,
593		AvailableData { pov, validation_data },
594		expected_erasure_root,
595		core_index,
596		node_features,
597	)
598	.await
599}
600
601async fn request_pov(
602	sender: &mut impl overseer::CandidateBackingSenderTrait,
603	relay_parent: Hash,
604	from_validator: ValidatorIndex,
605	para_id: ParaId,
606	candidate_hash: CandidateHash,
607	pov_hash: Hash,
608) -> Result<Arc<PoV>, Error> {
609	let (tx, rx) = oneshot::channel();
610	sender
611		.send_message(AvailabilityDistributionMessage::FetchPoV {
612			relay_parent,
613			from_validator,
614			para_id,
615			candidate_hash,
616			pov_hash,
617			tx,
618		})
619		.await;
620
621	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
622	Ok(Arc::new(pov))
623}
624
625async fn request_candidate_validation(
626	sender: &mut impl overseer::CandidateBackingSenderTrait,
627	validation_data: PersistedValidationData,
628	validation_code: ValidationCode,
629	candidate_receipt: CandidateReceipt,
630	pov: Arc<PoV>,
631	executor_params: ExecutorParams,
632) -> Result<ValidationResult, Error> {
633	let (tx, rx) = oneshot::channel();
634	let is_system = candidate_receipt.descriptor.para_id().is_system();
635	let relay_parent = candidate_receipt.descriptor.relay_parent();
636
637	sender
638		.send_message(CandidateValidationMessage::ValidateFromExhaustive {
639			validation_data,
640			validation_code,
641			candidate_receipt,
642			pov,
643			executor_params,
644			exec_kind: if is_system {
645				PvfExecKind::BackingSystemParas(relay_parent)
646			} else {
647				PvfExecKind::Backing(relay_parent)
648			},
649			response_sender: tx,
650		})
651		.await;
652
653	match rx.await {
654		Ok(Ok(validation_result)) => Ok(validation_result),
655		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
656		Err(err) => Err(Error::ValidateFromExhaustive(err)),
657	}
658}
659
660struct BackgroundValidationOutputs {
661	candidate: CandidateReceipt,
662	commitments: CandidateCommitments,
663	persisted_validation_data: PersistedValidationData,
664}
665
666type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
667
668struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
669	sender: S,
670	tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
671	candidate: CandidateReceipt,
672	relay_parent: Hash,
673	session_index: SessionIndex,
674	persisted_validation_data: PersistedValidationData,
675	pov: PoVData,
676	n_validators: usize,
677	make_command: F,
678}
679
680async fn validate_and_make_available(
681	params: BackgroundValidationParams<
682		impl overseer::CandidateBackingSenderTrait,
683		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
684	>,
685	core_index: CoreIndex,
686) -> Result<(), Error> {
687	let BackgroundValidationParams {
688		mut sender,
689		mut tx_command,
690		candidate,
691		relay_parent,
692		session_index,
693		persisted_validation_data,
694		pov,
695		n_validators,
696		make_command,
697	} = params;
698
699	let validation_code = {
700		let validation_code_hash = candidate.descriptor().validation_code_hash();
701		let (tx, rx) = oneshot::channel();
702		sender
703			.send_message(RuntimeApiMessage::Request(
704				relay_parent,
705				RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
706			))
707			.await;
708
709		let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
710		match code {
711			Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
712			Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
713			Ok(Some(c)) => c,
714		}
715	};
716
717	let executor_params = match executor_params_at_relay_parent(relay_parent, &mut sender).await {
718		Ok(ep) => ep,
719		Err(e) => return Err(Error::UtilError(e)),
720	};
721
722	let node_features = request_node_features(relay_parent, session_index, &mut sender)
723		.await?
724		.unwrap_or(NodeFeatures::EMPTY);
725
726	let pov = match pov {
727		PoVData::Ready(pov) => pov,
728		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } =>
729			match request_pov(
730				&mut sender,
731				relay_parent,
732				from_validator,
733				candidate.descriptor.para_id(),
734				candidate_hash,
735				pov_hash,
736			)
737			.await
738			{
739				Err(Error::FetchPoV) => {
740					tx_command
741						.send((
742							relay_parent,
743							ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
744						))
745						.await
746						.map_err(Error::BackgroundValidationMpsc)?;
747					return Ok(())
748				},
749				Err(err) => return Err(err),
750				Ok(pov) => pov,
751			},
752	};
753
754	let v = {
755		request_candidate_validation(
756			&mut sender,
757			persisted_validation_data,
758			validation_code,
759			candidate.clone(),
760			pov.clone(),
761			executor_params,
762		)
763		.await?
764	};
765
766	let res = match v {
767		ValidationResult::Valid(commitments, validation_data) => {
768			gum::debug!(
769				target: LOG_TARGET,
770				candidate_hash = ?candidate.hash(),
771				"Validation successful",
772			);
773
774			let erasure_valid = make_pov_available(
775				&mut sender,
776				n_validators,
777				pov.clone(),
778				candidate.hash(),
779				validation_data.clone(),
780				candidate.descriptor.erasure_root(),
781				core_index,
782				node_features,
783			)
784			.await;
785
786			match erasure_valid {
787				Ok(()) => Ok(BackgroundValidationOutputs {
788					candidate,
789					commitments,
790					persisted_validation_data: validation_data,
791				}),
792				Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
793					gum::debug!(
794						target: LOG_TARGET,
795						candidate_hash = ?candidate.hash(),
796						actual_commitments = ?commitments,
797						"Erasure root doesn't match the announced by the candidate receipt",
798					);
799					Err(candidate)
800				},
801				// Bubble up any other error.
802				Err(e) => return Err(e),
803			}
804		},
805		ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
806			// If validation produces a new set of commitments, we vote the candidate as invalid.
807			gum::warn!(
808				target: LOG_TARGET,
809				candidate_hash = ?candidate.hash(),
810				"Validation yielded different commitments",
811			);
812			Err(candidate)
813		},
814		ValidationResult::Invalid(reason) => {
815			gum::warn!(
816				target: LOG_TARGET,
817				candidate_hash = ?candidate.hash(),
818				reason = ?reason,
819				"Validation yielded an invalid candidate",
820			);
821			Err(candidate)
822		},
823	};
824
825	tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
826}
827
828#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
829async fn handle_communication<Context>(
830	ctx: &mut Context,
831	state: &mut State,
832	message: CandidateBackingMessage,
833	metrics: &Metrics,
834) -> Result<(), Error> {
835	match message {
836		CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
837			handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
838		},
839		CandidateBackingMessage::Statement(relay_parent, statement) => {
840			handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
841		},
842		CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) =>
843			handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?,
844		CandidateBackingMessage::CanSecond(request, tx) =>
845			handle_can_second_request(ctx, state, request, tx).await,
846	}
847
848	Ok(())
849}
850
851#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
852async fn handle_active_leaves_update<Context>(
853	ctx: &mut Context,
854	update: ActiveLeavesUpdate,
855	state: &mut State,
856) -> Result<(), Error> {
857	enum LeafHasProspectiveParachains {
858		Enabled(Result<ProspectiveParachainsMode, ImplicitViewFetchError>),
859		Disabled,
860	}
861
862	// Activate in implicit view before deactivate, per the docs
863	// on ImplicitView, this is more efficient.
864	let res = if let Some(leaf) = update.activated {
865		// Only activate in implicit view if prospective
866		// parachains are enabled.
867		let mode = prospective_parachains_mode(ctx.sender(), leaf.hash).await?;
868
869		let leaf_hash = leaf.hash;
870		Some((
871			leaf,
872			match mode {
873				ProspectiveParachainsMode::Disabled => LeafHasProspectiveParachains::Disabled,
874				ProspectiveParachainsMode::Enabled { .. } => LeafHasProspectiveParachains::Enabled(
875					state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| mode),
876				),
877			},
878		))
879	} else {
880		None
881	};
882
883	for deactivated in update.deactivated {
884		state.per_leaf.remove(&deactivated);
885		state.implicit_view.deactivate_leaf(deactivated);
886	}
887
888	// clean up `per_relay_parent` according to ancestry
889	// of leaves. we do this so we can clean up candidates right after
890	// as a result.
891	//
892	// when prospective parachains are disabled, the implicit view is empty,
893	// which means we'll clean up everything that's not a leaf - the expected behavior
894	// for pre-asynchronous backing.
895	{
896		let remaining: HashSet<_> = state
897			.per_leaf
898			.keys()
899			.chain(state.implicit_view.all_allowed_relay_parents())
900			.collect();
901
902		state.per_relay_parent.retain(|r, _| remaining.contains(&r));
903	}
904
905	// clean up `per_candidate` according to which relay-parents
906	// are known.
907	//
908	// when prospective parachains are disabled, we clean up all candidates
909	// because we've cleaned up all relay parents. this is correct.
910	state
911		.per_candidate
912		.retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
913
914	// Get relay parents which might be fresh but might be known already
915	// that are explicit or implicit from the new active leaf.
916	let (fresh_relay_parents, leaf_mode) = match res {
917		None => return Ok(()),
918		Some((leaf, LeafHasProspectiveParachains::Disabled)) => {
919			// defensive in this case - for enabled, this manifests as an error.
920			if state.per_leaf.contains_key(&leaf.hash) {
921				return Ok(())
922			}
923
924			state
925				.per_leaf
926				.insert(leaf.hash, ActiveLeafState::new(ProspectiveParachainsMode::Disabled));
927
928			(vec![leaf.hash], ProspectiveParachainsMode::Disabled)
929		},
930		Some((leaf, LeafHasProspectiveParachains::Enabled(Ok(prospective_parachains_mode)))) => {
931			let fresh_relay_parents =
932				state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
933
934			let active_leaf_state = ActiveLeafState::new(prospective_parachains_mode);
935
936			state.per_leaf.insert(leaf.hash, active_leaf_state);
937
938			let fresh_relay_parent = match fresh_relay_parents {
939				Some(f) => f.to_vec(),
940				None => {
941					gum::warn!(
942						target: LOG_TARGET,
943						leaf_hash = ?leaf.hash,
944						"Implicit view gave no relay-parents"
945					);
946
947					vec![leaf.hash]
948				},
949			};
950			(fresh_relay_parent, prospective_parachains_mode)
951		},
952		Some((leaf, LeafHasProspectiveParachains::Enabled(Err(e)))) => {
953			gum::debug!(
954				target: LOG_TARGET,
955				leaf_hash = ?leaf.hash,
956				err = ?e,
957				"Failed to load implicit view for leaf."
958			);
959
960			return Ok(())
961		},
962	};
963
964	// add entries in `per_relay_parent`. for all new relay-parents.
965	for maybe_new in fresh_relay_parents {
966		if state.per_relay_parent.contains_key(&maybe_new) {
967			continue
968		}
969
970		let mode = match state.per_leaf.get(&maybe_new) {
971			None => {
972				// If the relay-parent isn't a leaf itself,
973				// then it is guaranteed by the prospective parachains
974				// subsystem that it is an ancestor of a leaf which
975				// has prospective parachains enabled and that the
976				// block itself did.
977				leaf_mode
978			},
979			Some(l) => l.into(),
980		};
981
982		// construct a `PerRelayParent` from the runtime API
983		// and insert it.
984		let per = construct_per_relay_parent_state(
985			ctx,
986			maybe_new,
987			&state.keystore,
988			&mut state.validator_to_group_cache,
989			mode,
990		)
991		.await?;
992
993		if let Some(per) = per {
994			state.per_relay_parent.insert(maybe_new, per);
995		}
996	}
997
998	Ok(())
999}
1000
1001macro_rules! try_runtime_api {
1002	($x: expr) => {
1003		match $x {
1004			Ok(x) => x,
1005			Err(err) => {
1006				// Only bubble up fatal errors.
1007				error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1008
1009				// We can't do candidate validation work if we don't have the
1010				// requisite runtime API data. But these errors should not take
1011				// down the node.
1012				return Ok(None)
1013			},
1014		}
1015	};
1016}
1017
1018fn core_index_from_statement(
1019	validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1020	group_rotation_info: &GroupRotationInfo,
1021	n_cores: u32,
1022	claim_queue: &ClaimQueueSnapshot,
1023	statement: &SignedFullStatementWithPVD,
1024) -> Option<CoreIndex> {
1025	let compact_statement = statement.as_unchecked();
1026	let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1027
1028	gum::trace!(
1029		target:LOG_TARGET,
1030		?group_rotation_info,
1031		?statement,
1032		?validator_to_group,
1033		n_cores,
1034		?candidate_hash,
1035		"Extracting core index from statement"
1036	);
1037
1038	let statement_validator_index = statement.validator_index();
1039	let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1040		gum::debug!(
1041			target: LOG_TARGET,
1042			?group_rotation_info,
1043			?statement,
1044			?validator_to_group,
1045			n_cores,
1046			?candidate_hash,
1047			"Invalid validator index: {:?}",
1048			statement_validator_index
1049		);
1050		return None
1051	};
1052
1053	// First check if the statement para id matches the core assignment.
1054	let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1055
1056	if core_index.0 > n_cores {
1057		gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1058		return None
1059	}
1060
1061	if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1062		let candidate_para_id = candidate.descriptor.para_id();
1063		let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1064
1065		if !assigned_paras.any(|id| id == &candidate_para_id) {
1066			gum::debug!(
1067				target: LOG_TARGET,
1068				?candidate_hash,
1069				?core_index,
1070				assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1071				?candidate_para_id,
1072				"Invalid CoreIndex, core is not assigned to this para_id"
1073			);
1074			return None
1075		}
1076		return Some(core_index)
1077	} else {
1078		return Some(core_index)
1079	}
1080}
1081
1082/// Load the data necessary to do backing work on top of a relay-parent.
1083#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1084async fn construct_per_relay_parent_state<Context>(
1085	ctx: &mut Context,
1086	relay_parent: Hash,
1087	keystore: &KeystorePtr,
1088	validator_to_group_cache: &mut LruMap<
1089		SessionIndex,
1090		Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
1091	>,
1092	mode: ProspectiveParachainsMode,
1093) -> Result<Option<PerRelayParentState>, Error> {
1094	let parent = relay_parent;
1095
1096	let (session_index, validators, groups, cores) = futures::try_join!(
1097		request_session_index_for_child(parent, ctx.sender()).await,
1098		request_validators(parent, ctx.sender()).await,
1099		request_validator_groups(parent, ctx.sender()).await,
1100		request_from_runtime(parent, ctx.sender(), |tx| {
1101			RuntimeApiRequest::AvailabilityCores(tx)
1102		},)
1103		.await,
1104	)
1105	.map_err(Error::JoinMultiple)?;
1106
1107	let session_index = try_runtime_api!(session_index);
1108
1109	let inject_core_index = request_node_features(parent, session_index, ctx.sender())
1110		.await?
1111		.unwrap_or(NodeFeatures::EMPTY)
1112		.get(FeatureIndex::ElasticScalingMVP as usize)
1113		.map(|b| *b)
1114		.unwrap_or(false);
1115
1116	gum::debug!(target: LOG_TARGET, inject_core_index, ?parent, "New state");
1117
1118	let validators: Vec<_> = try_runtime_api!(validators);
1119	let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1120	let cores = try_runtime_api!(cores);
1121	let minimum_backing_votes =
1122		try_runtime_api!(request_min_backing_votes(parent, session_index, ctx.sender()).await);
1123
1124	// TODO: https://github.com/paritytech/polkadot-sdk/issues/1940
1125	// Once runtime ver `DISABLED_VALIDATORS_RUNTIME_REQUIREMENT` is released remove this call to
1126	// `get_disabled_validators_with_fallback`, add `request_disabled_validators` call to the
1127	// `try_join!` above and use `try_runtime_api!` to get `disabled_validators`
1128	let disabled_validators =
1129		get_disabled_validators_with_fallback(ctx.sender(), parent).await.map_err(|e| {
1130			Error::UtilError(TryFrom::try_from(e).expect("the conversion is infallible; qed"))
1131		})?;
1132
1133	let maybe_claim_queue = try_runtime_api!(fetch_claim_queue(ctx.sender(), parent).await);
1134
1135	let signing_context = SigningContext { parent_hash: parent, session_index };
1136	let validator = match Validator::construct(
1137		&validators,
1138		&disabled_validators,
1139		signing_context.clone(),
1140		keystore.clone(),
1141	) {
1142		Ok(v) => Some(v),
1143		Err(util::Error::NotAValidator) => None,
1144		Err(e) => {
1145			gum::warn!(
1146				target: LOG_TARGET,
1147				err = ?e,
1148				"Cannot participate in candidate backing",
1149			);
1150
1151			return Ok(None)
1152		},
1153	};
1154
1155	let n_cores = cores.len();
1156
1157	let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1158	let mut assigned_core = None;
1159
1160	let has_claim_queue = maybe_claim_queue.is_some();
1161	let mut claim_queue = maybe_claim_queue.unwrap_or_default().0;
1162
1163	for (idx, core) in cores.iter().enumerate() {
1164		let core_index = CoreIndex(idx as _);
1165
1166		if !has_claim_queue {
1167			match core {
1168				CoreState::Scheduled(scheduled) =>
1169					claim_queue.insert(core_index, [scheduled.para_id].into_iter().collect()),
1170				CoreState::Occupied(occupied) if mode.is_enabled() => {
1171					// Async backing makes it legal to build on top of
1172					// occupied core.
1173					if let Some(next) = &occupied.next_up_on_available {
1174						claim_queue.insert(core_index, [next.para_id].into_iter().collect())
1175					} else {
1176						continue
1177					}
1178				},
1179				_ => continue,
1180			};
1181		} else if !claim_queue.contains_key(&core_index) {
1182			continue
1183		}
1184
1185		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1186		if let Some(g) = validator_groups.get(group_index.0 as usize) {
1187			if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1188				assigned_core = Some(core_index);
1189			}
1190			groups.insert(core_index, g.clone());
1191		}
1192	}
1193	gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1194
1195	let validator_to_group = validator_to_group_cache
1196		.get_or_insert(session_index, || {
1197			let mut vector = vec![None; validators.len()];
1198
1199			for (group_idx, validator_group) in validator_groups.iter().enumerate() {
1200				for validator in validator_group {
1201					vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
1202				}
1203			}
1204
1205			Arc::new(IndexedVec::<_, _>::from(vector))
1206		})
1207		.expect("Just inserted");
1208
1209	let table_context = TableContext { validator, groups, validators, disabled_validators };
1210	let table_config = TableConfig {
1211		allow_multiple_seconded: match mode {
1212			ProspectiveParachainsMode::Enabled { .. } => true,
1213			ProspectiveParachainsMode::Disabled => false,
1214		},
1215	};
1216
1217	Ok(Some(PerRelayParentState {
1218		prospective_parachains_mode: mode,
1219		parent,
1220		session_index,
1221		assigned_core,
1222		backed: HashSet::new(),
1223		table: Table::new(table_config),
1224		table_context,
1225		issued_statements: HashSet::new(),
1226		awaiting_validation: HashSet::new(),
1227		fallbacks: HashMap::new(),
1228		minimum_backing_votes,
1229		inject_core_index,
1230		n_cores: cores.len() as u32,
1231		claim_queue: ClaimQueueSnapshot::from(claim_queue),
1232		validator_to_group: validator_to_group.clone(),
1233		group_rotation_info,
1234	}))
1235}
1236
1237enum SecondingAllowed {
1238	No,
1239	// On which leaves is seconding allowed.
1240	Yes(Vec<Hash>),
1241}
1242
1243/// Checks whether a candidate can be seconded based on its hypothetical membership in the fragment
1244/// chain.
1245#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1246async fn seconding_sanity_check<Context>(
1247	ctx: &mut Context,
1248	active_leaves: &HashMap<Hash, ActiveLeafState>,
1249	implicit_view: &ImplicitView,
1250	hypothetical_candidate: HypotheticalCandidate,
1251) -> SecondingAllowed {
1252	let mut leaves_for_seconding = Vec::new();
1253	let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1254
1255	let candidate_para = hypothetical_candidate.candidate_para();
1256	let candidate_relay_parent = hypothetical_candidate.relay_parent();
1257	let candidate_hash = hypothetical_candidate.candidate_hash();
1258
1259	for (head, leaf_state) in active_leaves {
1260		if ProspectiveParachainsMode::from(leaf_state).is_enabled() {
1261			// Check that the candidate relay parent is allowed for para, skip the
1262			// leaf otherwise.
1263			let allowed_parents_for_para =
1264				implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1265			if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1266				continue
1267			}
1268
1269			let (tx, rx) = oneshot::channel();
1270			ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1271				HypotheticalMembershipRequest {
1272					candidates: vec![hypothetical_candidate.clone()],
1273					fragment_chain_relay_parent: Some(*head),
1274				},
1275				tx,
1276			))
1277			.await;
1278			let response = rx.map_ok(move |candidate_memberships| {
1279				let is_member_or_potential = candidate_memberships
1280					.into_iter()
1281					.find_map(|(candidate, leaves)| {
1282						(candidate.candidate_hash() == candidate_hash).then_some(leaves)
1283					})
1284					.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1285					.is_some();
1286
1287				(is_member_or_potential, head)
1288			});
1289			responses.push_back(response.boxed());
1290		} else {
1291			if *head == candidate_relay_parent {
1292				if let ActiveLeafState::ProspectiveParachainsDisabled { seconded } = leaf_state {
1293					if seconded.contains(&candidate_para) {
1294						// The leaf is already occupied. For non-prospective parachains, we only
1295						// second one candidate.
1296						return SecondingAllowed::No
1297					}
1298				}
1299				responses.push_back(futures::future::ok((true, head)).boxed());
1300			}
1301		}
1302	}
1303
1304	if responses.is_empty() {
1305		return SecondingAllowed::No
1306	}
1307
1308	while let Some(response) = responses.next().await {
1309		match response {
1310			Err(oneshot::Canceled) => {
1311				gum::warn!(
1312					target: LOG_TARGET,
1313					"Failed to reach prospective parachains subsystem for hypothetical membership",
1314				);
1315
1316				return SecondingAllowed::No
1317			},
1318			Ok((is_member_or_potential, head)) => match is_member_or_potential {
1319				false => {
1320					gum::debug!(
1321						target: LOG_TARGET,
1322						?candidate_hash,
1323						leaf_hash = ?head,
1324						"Refusing to second candidate at leaf. Is not a potential member.",
1325					);
1326				},
1327				true => {
1328					leaves_for_seconding.push(*head);
1329				},
1330			},
1331		}
1332	}
1333
1334	if leaves_for_seconding.is_empty() {
1335		SecondingAllowed::No
1336	} else {
1337		SecondingAllowed::Yes(leaves_for_seconding)
1338	}
1339}
1340
1341/// Performs seconding sanity check for an advertisement.
1342#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1343async fn handle_can_second_request<Context>(
1344	ctx: &mut Context,
1345	state: &State,
1346	request: CanSecondRequest,
1347	tx: oneshot::Sender<bool>,
1348) {
1349	let relay_parent = request.candidate_relay_parent;
1350	let response = if state
1351		.per_relay_parent
1352		.get(&relay_parent)
1353		.map_or(false, |pr_state| pr_state.prospective_parachains_mode.is_enabled())
1354	{
1355		let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1356			candidate_hash: request.candidate_hash,
1357			candidate_para: request.candidate_para_id,
1358			parent_head_data_hash: request.parent_head_data_hash,
1359			candidate_relay_parent: relay_parent,
1360		};
1361
1362		let result = seconding_sanity_check(
1363			ctx,
1364			&state.per_leaf,
1365			&state.implicit_view,
1366			hypothetical_candidate,
1367		)
1368		.await;
1369
1370		match result {
1371			SecondingAllowed::No => false,
1372			SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1373		}
1374	} else {
1375		// Relay parent is unknown or async backing is disabled.
1376		false
1377	};
1378
1379	let _ = tx.send(response);
1380}
1381
1382#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1383async fn handle_validated_candidate_command<Context>(
1384	ctx: &mut Context,
1385	state: &mut State,
1386	relay_parent: Hash,
1387	command: ValidatedCandidateCommand,
1388	metrics: &Metrics,
1389) -> Result<(), Error> {
1390	match state.per_relay_parent.get_mut(&relay_parent) {
1391		Some(rp_state) => {
1392			let candidate_hash = command.candidate_hash();
1393			rp_state.awaiting_validation.remove(&candidate_hash);
1394
1395			match command {
1396				ValidatedCandidateCommand::Second(res) => match res {
1397					Ok(outputs) => {
1398						let BackgroundValidationOutputs {
1399							candidate,
1400							commitments,
1401							persisted_validation_data,
1402						} = outputs;
1403
1404						if rp_state.issued_statements.contains(&candidate_hash) {
1405							return Ok(())
1406						}
1407
1408						let receipt = CommittedCandidateReceipt {
1409							descriptor: candidate.descriptor.clone(),
1410							commitments,
1411						};
1412
1413						let hypothetical_candidate = HypotheticalCandidate::Complete {
1414							candidate_hash,
1415							receipt: Arc::new(receipt.clone()),
1416							persisted_validation_data: persisted_validation_data.clone(),
1417						};
1418						// sanity check that we're allowed to second the candidate
1419						// and that it doesn't conflict with other candidates we've
1420						// seconded.
1421						let hypothetical_membership = match seconding_sanity_check(
1422							ctx,
1423							&state.per_leaf,
1424							&state.implicit_view,
1425							hypothetical_candidate,
1426						)
1427						.await
1428						{
1429							SecondingAllowed::No => return Ok(()),
1430							SecondingAllowed::Yes(membership) => membership,
1431						};
1432
1433						let statement =
1434							StatementWithPVD::Seconded(receipt, persisted_validation_data);
1435
1436						// If we get an Error::RejectedByProspectiveParachains,
1437						// then the statement has not been distributed or imported into
1438						// the table.
1439						let res = sign_import_and_distribute_statement(
1440							ctx,
1441							rp_state,
1442							&mut state.per_candidate,
1443							statement,
1444							state.keystore.clone(),
1445							metrics,
1446						)
1447						.await;
1448
1449						if let Err(Error::RejectedByProspectiveParachains) = res {
1450							let candidate_hash = candidate.hash();
1451							gum::debug!(
1452								target: LOG_TARGET,
1453								relay_parent = ?candidate.descriptor().relay_parent(),
1454								?candidate_hash,
1455								"Attempted to second candidate but was rejected by prospective parachains",
1456							);
1457
1458							// Ensure the collator is reported.
1459							ctx.send_message(CollatorProtocolMessage::Invalid(
1460								candidate.descriptor().relay_parent(),
1461								candidate,
1462							))
1463							.await;
1464
1465							return Ok(())
1466						}
1467
1468						if let Some(stmt) = res? {
1469							match state.per_candidate.get_mut(&candidate_hash) {
1470								None => {
1471									gum::warn!(
1472										target: LOG_TARGET,
1473										?candidate_hash,
1474										"Missing `per_candidate` for seconded candidate.",
1475									);
1476								},
1477								Some(p) => p.seconded_locally = true,
1478							}
1479
1480							// record seconded candidates for non-prospective-parachains mode.
1481							for leaf in hypothetical_membership {
1482								let leaf_data = match state.per_leaf.get_mut(&leaf) {
1483									None => {
1484										gum::warn!(
1485											target: LOG_TARGET,
1486											leaf_hash = ?leaf,
1487											"Missing `per_leaf` for known active leaf."
1488										);
1489
1490										continue
1491									},
1492									Some(d) => d,
1493								};
1494
1495								leaf_data.add_seconded_candidate(candidate.descriptor().para_id());
1496							}
1497
1498							rp_state.issued_statements.insert(candidate_hash);
1499
1500							metrics.on_candidate_seconded();
1501							ctx.send_message(CollatorProtocolMessage::Seconded(
1502								rp_state.parent,
1503								StatementWithPVD::drop_pvd_from_signed(stmt),
1504							))
1505							.await;
1506						}
1507					},
1508					Err(candidate) => {
1509						ctx.send_message(CollatorProtocolMessage::Invalid(
1510							rp_state.parent,
1511							candidate,
1512						))
1513						.await;
1514					},
1515				},
1516				ValidatedCandidateCommand::Attest(res) => {
1517					// We are done - avoid new validation spawns:
1518					rp_state.fallbacks.remove(&candidate_hash);
1519					// sanity check.
1520					if !rp_state.issued_statements.contains(&candidate_hash) {
1521						if res.is_ok() {
1522							let statement = StatementWithPVD::Valid(candidate_hash);
1523
1524							sign_import_and_distribute_statement(
1525								ctx,
1526								rp_state,
1527								&mut state.per_candidate,
1528								statement,
1529								state.keystore.clone(),
1530								metrics,
1531							)
1532							.await?;
1533						}
1534						rp_state.issued_statements.insert(candidate_hash);
1535					}
1536				},
1537				ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1538					if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1539						if let Some(index) = attesting.backing.pop() {
1540							attesting.from_validator = index;
1541							let attesting = attesting.clone();
1542
1543							// The candidate state should be available because we've
1544							// validated it before, the relay-parent is still around,
1545							// and candidates are pruned on the basis of relay-parents.
1546							//
1547							// If it's not, then no point in validating it anyway.
1548							if let Some(pvd) = state
1549								.per_candidate
1550								.get(&candidate_hash)
1551								.map(|pc| pc.persisted_validation_data.clone())
1552							{
1553								kick_off_validation_work(
1554									ctx,
1555									rp_state,
1556									pvd,
1557									&state.background_validation_tx,
1558									attesting,
1559								)
1560								.await?;
1561							}
1562						}
1563					} else {
1564						gum::warn!(
1565							target: LOG_TARGET,
1566							"AttestNoPoV was triggered without fallback being available."
1567						);
1568						debug_assert!(false);
1569					}
1570				},
1571			}
1572		},
1573		None => {
1574			// simple race condition; can be ignored = this relay-parent
1575			// is no longer relevant.
1576		},
1577	}
1578
1579	Ok(())
1580}
1581
1582fn sign_statement(
1583	rp_state: &PerRelayParentState,
1584	statement: StatementWithPVD,
1585	keystore: KeystorePtr,
1586	metrics: &Metrics,
1587) -> Option<SignedFullStatementWithPVD> {
1588	let signed = rp_state
1589		.table_context
1590		.validator
1591		.as_ref()?
1592		.sign(keystore, statement)
1593		.ok()
1594		.flatten()?;
1595	metrics.on_statement_signed();
1596	Some(signed)
1597}
1598
1599/// Import a statement into the statement table and return the summary of the import.
1600///
1601/// This will fail with `Error::RejectedByProspectiveParachains` if the message type
1602/// is seconded, the candidate is fresh,
1603/// and any of the following are true:
1604/// 1. There is no `PersistedValidationData` attached.
1605/// 2. Prospective parachains are enabled for the relay parent and the prospective parachains
1606///    subsystem returned an empty `HypotheticalMembership` i.e. did not recognize the candidate as
1607///    being applicable to any of the active leaves.
1608#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1609async fn import_statement<Context>(
1610	ctx: &mut Context,
1611	rp_state: &mut PerRelayParentState,
1612	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1613	statement: &SignedFullStatementWithPVD,
1614) -> Result<Option<TableSummary>, Error> {
1615	let candidate_hash = statement.payload().candidate_hash();
1616
1617	gum::debug!(
1618		target: LOG_TARGET,
1619		statement = ?statement.payload().to_compact(),
1620		validator_index = statement.validator_index().0,
1621		?candidate_hash,
1622		"Importing statement",
1623	);
1624
1625	// If this is a new candidate (statement is 'seconded' and candidate is unknown),
1626	// we need to create an entry in the `PerCandidateState` map.
1627	//
1628	// If the relay parent supports prospective parachains, we also need
1629	// to inform the prospective parachains subsystem of the seconded candidate.
1630	// If `ProspectiveParachainsMessage::Second` fails, then we return
1631	// Error::RejectedByProspectiveParachains.
1632	//
1633	// Persisted Validation Data should be available - it may already be available
1634	// if this is a candidate we are seconding.
1635	//
1636	// We should also not accept any candidates which have no valid depths under any of
1637	// our active leaves.
1638	if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1639		if !per_candidate.contains_key(&candidate_hash) {
1640			if rp_state.prospective_parachains_mode.is_enabled() {
1641				let (tx, rx) = oneshot::channel();
1642				ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1643					IntroduceSecondedCandidateRequest {
1644						candidate_para: candidate.descriptor.para_id(),
1645						candidate_receipt: candidate.clone(),
1646						persisted_validation_data: pvd.clone(),
1647					},
1648					tx,
1649				))
1650				.await;
1651
1652				match rx.await {
1653					Err(oneshot::Canceled) => {
1654						gum::warn!(
1655							target: LOG_TARGET,
1656							"Could not reach the Prospective Parachains subsystem."
1657						);
1658
1659						return Err(Error::RejectedByProspectiveParachains)
1660					},
1661					Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1662					Ok(true) => {},
1663				}
1664			}
1665
1666			// Only save the candidate if it was approved by prospective parachains.
1667			per_candidate.insert(
1668				candidate_hash,
1669				PerCandidateState {
1670					persisted_validation_data: pvd.clone(),
1671					// This is set after importing when seconding locally.
1672					seconded_locally: false,
1673					relay_parent: candidate.descriptor.relay_parent(),
1674				},
1675			);
1676		}
1677	}
1678
1679	let stmt = primitive_statement_to_table(statement);
1680
1681	let core = core_index_from_statement(
1682		&rp_state.validator_to_group,
1683		&rp_state.group_rotation_info,
1684		rp_state.n_cores,
1685		&rp_state.claim_queue,
1686		statement,
1687	)
1688	.ok_or(Error::CoreIndexUnavailable)?;
1689
1690	Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1691}
1692
1693/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
1694/// misbehaviors as a result of importing a statement.
1695#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1696async fn post_import_statement_actions<Context>(
1697	ctx: &mut Context,
1698	rp_state: &mut PerRelayParentState,
1699	summary: Option<&TableSummary>,
1700) {
1701	if let Some(attested) = summary.as_ref().and_then(|s| {
1702		rp_state.table.attested_candidate(
1703			&s.candidate,
1704			&rp_state.table_context,
1705			rp_state.minimum_backing_votes,
1706		)
1707	}) {
1708		let candidate_hash = attested.candidate.hash();
1709
1710		// `HashSet::insert` returns true if the thing wasn't in there already.
1711		if rp_state.backed.insert(candidate_hash) {
1712			if let Some(backed) = table_attested_to_backed(
1713				attested,
1714				&rp_state.table_context,
1715				rp_state.inject_core_index,
1716			) {
1717				let para_id = backed.candidate().descriptor.para_id();
1718				gum::debug!(
1719					target: LOG_TARGET,
1720					candidate_hash = ?candidate_hash,
1721					relay_parent = ?rp_state.parent,
1722					%para_id,
1723					"Candidate backed",
1724				);
1725
1726				if rp_state.prospective_parachains_mode.is_enabled() {
1727					// Inform the prospective parachains subsystem
1728					// that the candidate is now backed.
1729					ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1730						para_id,
1731						candidate_hash,
1732					))
1733					.await;
1734					// Notify statement distribution of backed candidate.
1735					ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1736				} else {
1737					// The provisioner waits on candidate-backing, which means
1738					// that we need to send unbounded messages to avoid cycles.
1739					//
1740					// Backed candidates are bounded by the number of validators,
1741					// parachains, and the block production rate of the relay chain.
1742					let message = ProvisionerMessage::ProvisionableData(
1743						rp_state.parent,
1744						ProvisionableData::BackedCandidate(backed.receipt()),
1745					);
1746					ctx.send_unbounded_message(message);
1747				}
1748			} else {
1749				gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1750			}
1751		} else {
1752			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1753		}
1754	} else {
1755		gum::debug!(target: LOG_TARGET, "No attested candidate");
1756	}
1757
1758	issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1759}
1760
1761/// Check if there have happened any new misbehaviors and issue necessary messages.
1762#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1763fn issue_new_misbehaviors<Context>(
1764	ctx: &mut Context,
1765	relay_parent: Hash,
1766	table: &mut Table<TableContext>,
1767) {
1768	// collect the misbehaviors to avoid double mutable self borrow issues
1769	let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1770	for (validator_id, report) in misbehaviors {
1771		// The provisioner waits on candidate-backing, which means
1772		// that we need to send unbounded messages to avoid cycles.
1773		//
1774		// Misbehaviors are bounded by the number of validators and
1775		// the block production protocol.
1776		ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1777			relay_parent,
1778			ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1779		));
1780	}
1781}
1782
1783/// Sign, import, and distribute a statement.
1784#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1785async fn sign_import_and_distribute_statement<Context>(
1786	ctx: &mut Context,
1787	rp_state: &mut PerRelayParentState,
1788	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1789	statement: StatementWithPVD,
1790	keystore: KeystorePtr,
1791	metrics: &Metrics,
1792) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1793	if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1794		let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1795
1796		// `Share` must always be sent before `Backed`. We send the latter in
1797		// `post_import_statement_action` below.
1798		let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1799		ctx.send_unbounded_message(smsg);
1800
1801		post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1802
1803		Ok(Some(signed_statement))
1804	} else {
1805		Ok(None)
1806	}
1807}
1808
1809#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1810async fn background_validate_and_make_available<Context>(
1811	ctx: &mut Context,
1812	rp_state: &mut PerRelayParentState,
1813	params: BackgroundValidationParams<
1814		impl overseer::CandidateBackingSenderTrait,
1815		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1816	>,
1817) -> Result<(), Error> {
1818	let candidate_hash = params.candidate.hash();
1819	let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1820	if rp_state.awaiting_validation.insert(candidate_hash) {
1821		// spawn background task.
1822		let bg = async move {
1823			if let Err(error) = validate_and_make_available(params, core_index).await {
1824				if let Error::BackgroundValidationMpsc(error) = error {
1825					gum::debug!(
1826						target: LOG_TARGET,
1827						?candidate_hash,
1828						?error,
1829						"Mpsc background validation mpsc died during validation- leaf no longer active?"
1830					);
1831				} else {
1832					gum::error!(
1833						target: LOG_TARGET,
1834						?candidate_hash,
1835						?error,
1836						"Failed to validate and make available",
1837					);
1838				}
1839			}
1840		};
1841
1842		ctx.spawn("backing-validation", bg.boxed())
1843			.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1844	}
1845
1846	Ok(())
1847}
1848
1849/// Kick off validation work and distribute the result as a signed statement.
1850#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1851async fn kick_off_validation_work<Context>(
1852	ctx: &mut Context,
1853	rp_state: &mut PerRelayParentState,
1854	persisted_validation_data: PersistedValidationData,
1855	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1856	attesting: AttestingData,
1857) -> Result<(), Error> {
1858	// Do nothing if the local validator is disabled or not a validator at all
1859	match rp_state.table_context.local_validator_is_disabled() {
1860		Some(true) => {
1861			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1862			return Ok(())
1863		},
1864		Some(false) => {}, // we are not disabled - move on
1865		None => {
1866			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1867			return Ok(())
1868		},
1869	}
1870
1871	let candidate_hash = attesting.candidate.hash();
1872	if rp_state.issued_statements.contains(&candidate_hash) {
1873		return Ok(())
1874	}
1875
1876	gum::debug!(
1877		target: LOG_TARGET,
1878		candidate_hash = ?candidate_hash,
1879		candidate_receipt = ?attesting.candidate,
1880		"Kicking off validation",
1881	);
1882
1883	let bg_sender = ctx.sender().clone();
1884	let pov = PoVData::FetchFromValidator {
1885		from_validator: attesting.from_validator,
1886		candidate_hash,
1887		pov_hash: attesting.pov_hash,
1888	};
1889
1890	background_validate_and_make_available(
1891		ctx,
1892		rp_state,
1893		BackgroundValidationParams {
1894			sender: bg_sender,
1895			tx_command: background_validation_tx.clone(),
1896			candidate: attesting.candidate,
1897			relay_parent: rp_state.parent,
1898			session_index: rp_state.session_index,
1899			persisted_validation_data,
1900			pov,
1901			n_validators: rp_state.table_context.validators.len(),
1902			make_command: ValidatedCandidateCommand::Attest,
1903		},
1904	)
1905	.await
1906}
1907
1908/// Import the statement and kick off validation work if it is a part of our assignment.
1909#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1910async fn maybe_validate_and_import<Context>(
1911	ctx: &mut Context,
1912	state: &mut State,
1913	relay_parent: Hash,
1914	statement: SignedFullStatementWithPVD,
1915) -> Result<(), Error> {
1916	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1917		Some(r) => r,
1918		None => {
1919			gum::trace!(
1920				target: LOG_TARGET,
1921				?relay_parent,
1922				"Received statement for unknown relay-parent"
1923			);
1924
1925			return Ok(())
1926		},
1927	};
1928
1929	// Don't import statement if the sender is disabled
1930	if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1931		gum::debug!(
1932			target: LOG_TARGET,
1933			sender_validator_idx = ?statement.validator_index(),
1934			"Not importing statement because the sender is disabled"
1935		);
1936		return Ok(())
1937	}
1938
1939	let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1940
1941	// if we get an Error::RejectedByProspectiveParachains,
1942	// we will do nothing.
1943	if let Err(Error::RejectedByProspectiveParachains) = res {
1944		gum::debug!(
1945			target: LOG_TARGET,
1946			?relay_parent,
1947			"Statement rejected by prospective parachains."
1948		);
1949
1950		return Ok(())
1951	}
1952
1953	let summary = res?;
1954	post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1955
1956	if let Some(summary) = summary {
1957		// import_statement already takes care of communicating with the
1958		// prospective parachains subsystem. At this point, the candidate
1959		// has already been accepted by the subsystem.
1960
1961		let candidate_hash = summary.candidate;
1962
1963		if Some(summary.group_id) != rp_state.assigned_core {
1964			return Ok(())
1965		}
1966
1967		let attesting = match statement.payload() {
1968			StatementWithPVD::Seconded(receipt, _) => {
1969				let attesting = AttestingData {
1970					candidate: rp_state
1971						.table
1972						.get_candidate(&candidate_hash)
1973						.ok_or(Error::CandidateNotFound)?
1974						.to_plain(),
1975					pov_hash: receipt.descriptor.pov_hash(),
1976					from_validator: statement.validator_index(),
1977					backing: Vec::new(),
1978				};
1979				rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1980				attesting
1981			},
1982			StatementWithPVD::Valid(candidate_hash) => {
1983				if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1984					let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1985					if our_index == Some(statement.validator_index()) {
1986						return Ok(())
1987					}
1988
1989					if rp_state.awaiting_validation.contains(candidate_hash) {
1990						// Job already running:
1991						attesting.backing.push(statement.validator_index());
1992						return Ok(())
1993					} else {
1994						// No job, so start another with current validator:
1995						attesting.from_validator = statement.validator_index();
1996						attesting.clone()
1997					}
1998				} else {
1999					return Ok(())
2000				}
2001			},
2002		};
2003
2004		// After `import_statement` succeeds, the candidate entry is guaranteed
2005		// to exist.
2006		if let Some(pvd) = state
2007			.per_candidate
2008			.get(&candidate_hash)
2009			.map(|pc| pc.persisted_validation_data.clone())
2010		{
2011			kick_off_validation_work(
2012				ctx,
2013				rp_state,
2014				pvd,
2015				&state.background_validation_tx,
2016				attesting,
2017			)
2018			.await?;
2019		}
2020	}
2021	Ok(())
2022}
2023
2024/// Kick off background validation with intent to second.
2025#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2026async fn validate_and_second<Context>(
2027	ctx: &mut Context,
2028	rp_state: &mut PerRelayParentState,
2029	persisted_validation_data: PersistedValidationData,
2030	candidate: &CandidateReceipt,
2031	pov: Arc<PoV>,
2032	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
2033) -> Result<(), Error> {
2034	let candidate_hash = candidate.hash();
2035
2036	gum::debug!(
2037		target: LOG_TARGET,
2038		candidate_hash = ?candidate_hash,
2039		candidate_receipt = ?candidate,
2040		"Validate and second candidate",
2041	);
2042
2043	let bg_sender = ctx.sender().clone();
2044	background_validate_and_make_available(
2045		ctx,
2046		rp_state,
2047		BackgroundValidationParams {
2048			sender: bg_sender,
2049			tx_command: background_validation_tx.clone(),
2050			candidate: candidate.clone(),
2051			relay_parent: rp_state.parent,
2052			session_index: rp_state.session_index,
2053			persisted_validation_data,
2054			pov: PoVData::Ready(pov),
2055			n_validators: rp_state.table_context.validators.len(),
2056			make_command: ValidatedCandidateCommand::Second,
2057		},
2058	)
2059	.await?;
2060
2061	Ok(())
2062}
2063
2064#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2065async fn handle_second_message<Context>(
2066	ctx: &mut Context,
2067	state: &mut State,
2068	candidate: CandidateReceipt,
2069	persisted_validation_data: PersistedValidationData,
2070	pov: PoV,
2071	metrics: &Metrics,
2072) -> Result<(), Error> {
2073	let _timer = metrics.time_process_second();
2074
2075	let candidate_hash = candidate.hash();
2076	let relay_parent = candidate.descriptor().relay_parent();
2077
2078	if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2079		gum::warn!(
2080			target: LOG_TARGET,
2081			?candidate_hash,
2082			"Candidate backing was asked to second candidate with wrong PVD",
2083		);
2084
2085		return Ok(())
2086	}
2087
2088	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2089		None => {
2090			gum::trace!(
2091				target: LOG_TARGET,
2092				?relay_parent,
2093				?candidate_hash,
2094				"We were asked to second a candidate outside of our view."
2095			);
2096
2097			return Ok(())
2098		},
2099		Some(r) => r,
2100	};
2101
2102	// Just return if the local validator is disabled. If we are here the local node should be a
2103	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
2104	if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2105		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2106		return Ok(())
2107	}
2108
2109	let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2110
2111	// Sanity check that candidate is from our assignment.
2112	if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2113		gum::debug!(
2114			target: LOG_TARGET,
2115			our_assignment_core = ?rp_state.assigned_core,
2116			our_assignment_paras = ?assigned_paras,
2117			collation = ?candidate.descriptor().para_id(),
2118			"Subsystem asked to second for para outside of our assignment",
2119		);
2120		return Ok(());
2121	}
2122
2123	gum::debug!(
2124		target: LOG_TARGET,
2125		our_assignment_core = ?rp_state.assigned_core,
2126		our_assignment_paras = ?assigned_paras,
2127		collation = ?candidate.descriptor().para_id(),
2128		"Current assignments vs collation",
2129	);
2130
2131	// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
2132	// Seconded statement only if we have not signed a Valid statement for the requested candidate.
2133	//
2134	// The actual logic of issuing the signed statement checks that this isn't
2135	// conflicting with other seconded candidates. Not doing that check here
2136	// gives other subsystems the ability to get us to execute arbitrary candidates,
2137	// but no more.
2138	if !rp_state.issued_statements.contains(&candidate_hash) {
2139		let pov = Arc::new(pov);
2140
2141		validate_and_second(
2142			ctx,
2143			rp_state,
2144			persisted_validation_data,
2145			&candidate,
2146			pov,
2147			&state.background_validation_tx,
2148		)
2149		.await?;
2150	}
2151
2152	Ok(())
2153}
2154
2155#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2156async fn handle_statement_message<Context>(
2157	ctx: &mut Context,
2158	state: &mut State,
2159	relay_parent: Hash,
2160	statement: SignedFullStatementWithPVD,
2161	metrics: &Metrics,
2162) -> Result<(), Error> {
2163	let _timer = metrics.time_process_statement();
2164
2165	// Validator disabling is handled in `maybe_validate_and_import`
2166	match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2167		Err(Error::ValidationFailed(_)) => Ok(()),
2168		Err(e) => Err(e),
2169		Ok(()) => Ok(()),
2170	}
2171}
2172
2173fn handle_get_backable_candidates_message(
2174	state: &State,
2175	requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2176	tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2177	metrics: &Metrics,
2178) -> Result<(), Error> {
2179	let _timer = metrics.time_get_backed_candidates();
2180
2181	let mut backed = HashMap::with_capacity(requested_candidates.len());
2182
2183	for (para_id, para_candidates) in requested_candidates {
2184		for (candidate_hash, relay_parent) in para_candidates.iter() {
2185			let rp_state = match state.per_relay_parent.get(&relay_parent) {
2186				Some(rp_state) => rp_state,
2187				None => {
2188					gum::debug!(
2189						target: LOG_TARGET,
2190						?relay_parent,
2191						?candidate_hash,
2192						"Requested candidate's relay parent is out of view",
2193					);
2194					break
2195				},
2196			};
2197			let maybe_backed_candidate = rp_state
2198				.table
2199				.attested_candidate(
2200					candidate_hash,
2201					&rp_state.table_context,
2202					rp_state.minimum_backing_votes,
2203				)
2204				.and_then(|attested| {
2205					table_attested_to_backed(
2206						attested,
2207						&rp_state.table_context,
2208						rp_state.inject_core_index,
2209					)
2210				});
2211
2212			if let Some(backed_candidate) = maybe_backed_candidate {
2213				backed
2214					.entry(para_id)
2215					.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2216					.push(backed_candidate);
2217			} else {
2218				break
2219			}
2220		}
2221	}
2222
2223	tx.send(backed).map_err(|data| Error::Send(data))?;
2224	Ok(())
2225}