Skip to main content

polkadot_node_core_backing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the `CandidateBackingSubsystem`.
18//!
19//! This subsystem maintains the entire responsibility of tracking parachain
20//! candidates which can be backed, as well as the issuance of statements
21//! about candidates when run on a validator node.
22//!
23//! There are two types of statements: `Seconded` and `Valid`.
24//! `Seconded` implies `Valid`, and nothing should be stated as
25//! `Valid` unless its already been `Seconded`.
26//!
27//! Validators may only second candidates which fall under their own group
28//! assignment, and they may only second one candidate per depth per active leaf.
29//! Candidates which are stated as either `Second` or `Valid` by a majority of the
30//! assigned group of validators may be backed on-chain and proceed to the availability
31//! stage.
32//!
33//! Depth is a concept relating to asynchronous backing, by which
34//! short sub-chains of candidates are backed and extended off-chain, and then placed
35//! asynchronously into blocks of the relay chain as those are authored and as the
36//! relay-chain state becomes ready for them. Asynchronous backing allows parachains to
37//! grow mostly independently from the state of the relay chain, which gives more time for
38//! parachains to be validated and thereby increases performance.
39//!
40//! Most of the work of asynchronous backing is handled by the Prospective Parachains
41//! subsystem. The 'depth' of a parachain block with respect to a relay chain block is
42//! a measure of how many parachain blocks are between the most recent included parachain block
43//! in the post-state of the relay-chain block and the candidate. For instance,
44//! a candidate that descends directly from the most recent parachain block in the relay-chain
45//! state has depth 0. The child of that candidate would have depth 1. And so on.
46//!
47//! The candidate backing subsystem keeps track of a set of 'active leaves' which are the
48//! most recent blocks in the relay-chain (which is in fact a tree) which could be built
49//! upon. Depth is always measured against active leaves, and the valid relay-parent that
50//! each candidate can have is determined by the active leaves. The Prospective Parachains
51//! subsystem enforces that the relay-parent increases monotonically, so that logic
52//! is not handled here. By communicating with the Prospective Parachains subsystem,
53//! this subsystem extrapolates an "implicit view" from the set of currently active leaves,
54//! which determines the set of all recent relay-chain block hashes which could be relay-parents
55//! for candidates backed in children of the active leaves.
56//!
57//! In fact, this subsystem relies on the Statement Distribution subsystem to prevent spam
58//! by enforcing the rule that each validator may second at most one candidate per depth per
59//! active leaf. This bounds the number of candidates that the system needs to consider and
60//! is not handled within this subsystem, except for candidates seconded locally.
61//!
62//! This subsystem also handles relay-chain heads which don't support asynchronous backing.
63//! For such active leaves, the only valid relay-parent is the leaf hash itself and the only
64//! allowed depth is 0.
65
66#![deny(unused_crate_dependencies)]
67
68use std::{
69	collections::{HashMap, HashSet},
70	sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75	channel::{mpsc, oneshot},
76	future::BoxFuture,
77	stream::FuturesOrdered,
78	FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use polkadot_node_primitives::{
84	AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
85	ValidationResult, DISPUTE_WINDOW,
86};
87use polkadot_node_subsystem::{
88	messages::{
89		AvailabilityDistributionMessage, AvailabilityStoreMessage, BackableCandidateRef,
90		CanSecondRequest, CandidateBackingMessage, CandidateValidationMessage,
91		CollatorProtocolMessage, HypotheticalCandidate, HypotheticalMembershipRequest,
92		IntroduceSecondedCandidateRequest, ProspectiveParachainsMessage, ProvisionableData,
93		ProvisionerMessage, PvfExecKind, RuntimeApiMessage, RuntimeApiRequest,
94		StatementDistributionMessage, StoreAvailableDataError,
95	},
96	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
97	SubsystemError,
98};
99use polkadot_node_subsystem_util::{
100	self as util,
101	backing_implicit_view::View as ImplicitView,
102	request_claim_queue, request_disabled_validators, request_min_backing_votes,
103	request_node_features, request_session_executor_params, request_session_index_for_child,
104	request_validator_groups, request_validators,
105	runtime::{self, ClaimQueueSnapshot},
106	Error as UtilError, Validator,
107};
108use polkadot_parachain_primitives::primitives::IsSystem;
109use polkadot_primitives::{
110	node_features::FeatureIndex, BackedCandidate, CandidateCommitments, CandidateDescriptorV2,
111	CandidateHash, CandidateReceiptV2 as CandidateReceipt,
112	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, ExecutorParams,
113	GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures,
114	PersistedValidationData, SessionIndex, SigningContext, ValidationCode, ValidatorId,
115	ValidatorIndex, ValidatorSignature, ValidityAttestation,
116};
117use polkadot_statement_table::{
118	generic::AttestedCandidate as TableAttestedCandidate,
119	v2::{
120		SignedStatement as TableSignedStatement, Statement as TableStatement,
121		Summary as TableSummary,
122	},
123	Context as TableContextTrait, Table,
124};
125use sp_keystore::KeystorePtr;
126
127mod error;
128
129mod metrics;
130use self::metrics::Metrics;
131
132#[cfg(test)]
133mod tests;
134
135const LOG_TARGET: &str = "parachain::candidate-backing";
136
137/// PoV data to validate.
138enum PoVData {
139	/// Already available (from candidate selection).
140	Ready(Arc<PoV>),
141	/// Needs to be fetched from validator (we are checking a signed statement).
142	FetchFromValidator {
143		from_validator: ValidatorIndex,
144		candidate_hash: CandidateHash,
145		pov_hash: Hash,
146	},
147}
148
149enum ValidatedCandidateCommand {
150	// We were instructed to second the candidate that has been already validated.
151	Second(BackgroundValidationResult),
152	// We were instructed to validate the candidate.
153	Attest(BackgroundValidationResult),
154	// We were not able to `Attest` because backing validator did not send us the PoV.
155	AttestNoPoV(CandidateHash),
156}
157
158impl std::fmt::Debug for ValidatedCandidateCommand {
159	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
160		let candidate_hash = self.candidate_hash();
161		match *self {
162			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
163			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
164			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
165		}
166	}
167}
168
169impl ValidatedCandidateCommand {
170	fn candidate_hash(&self) -> CandidateHash {
171		match *self {
172			ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
173			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
174			ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
175			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
176			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
177		}
178	}
179}
180
181/// The candidate backing subsystem.
182pub struct CandidateBackingSubsystem {
183	keystore: KeystorePtr,
184	metrics: Metrics,
185}
186
187impl CandidateBackingSubsystem {
188	/// Create a new instance of the `CandidateBackingSubsystem`.
189	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
190		Self { keystore, metrics }
191	}
192}
193
194#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
195impl<Context> CandidateBackingSubsystem
196where
197	Context: Send + Sync,
198{
199	fn start(self, ctx: Context) -> SpawnedSubsystem {
200		let future = async move {
201			run(ctx, self.keystore, self.metrics)
202				.await
203				.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
204		}
205		.boxed();
206
207		SpawnedSubsystem { name: "candidate-backing-subsystem", future }
208	}
209}
210
211struct PerSchedulingParentState {
212	/// The hash of the scheduling parent on top of which this job is doing it's work.
213	parent: Hash,
214	/// The node features.
215	node_features: NodeFeatures,
216	/// The `CoreIndex` assigned to the local validator at this scheduling parent.
217	assigned_core: Option<CoreIndex>,
218	/// The candidates that are backed by enough validators in their group, by hash.
219	backed: HashSet<CandidateHash>,
220	/// The table of candidates and statements under this relay-parent.
221	table: Table<TableContext>,
222	/// The table context, including groups.
223	table_context: TableContext,
224	/// We issued `Seconded` or `Valid` statements on about these candidates.
225	issued_statements: HashSet<CandidateHash>,
226	/// These candidates are undergoing validation in the background.
227	awaiting_validation: HashSet<CandidateHash>,
228	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
229	fallbacks: HashMap<CandidateHash, AttestingData>,
230	/// The minimum backing votes threshold.
231	minimum_backing_votes: u32,
232	/// The number of cores.
233	n_cores: u32,
234	/// Claim queue state. If the runtime API is not available, it'll be populated with info from
235	/// availability cores.
236	claim_queue: ClaimQueueSnapshot,
237	/// The validator index -> group mapping at this scheduling parent.
238	validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
239	/// Session index for this scheduling parent. Used as fallback for V1 candidates
240	/// where session_index is not in the descriptor. For V1, scheduling_parent == relay_parent.
241	session_index: SessionIndex,
242	/// The associated group rotation information.
243	group_rotation_info: GroupRotationInfo,
244}
245
246struct PerCandidateState {
247	persisted_validation_data: PersistedValidationData,
248	seconded_locally: bool,
249	scheduling_parent: Hash,
250}
251
252/// A cache for storing data per-session to reduce repeated
253/// runtime API calls and avoid redundant computations.
254struct PerSessionCache {
255	/// Cache for storing validators list, retrieved from the runtime.
256	validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
257	/// Cache for storing node features, retrieved from the runtime.
258	node_features_cache: LruMap<SessionIndex, NodeFeatures>,
259	/// Cache for storing executor parameters, retrieved from the runtime.
260	executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
261	/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
262	minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
263	/// Cache for storing validator-to-group mappings, computed from validator groups.
264	validator_to_group_cache:
265		LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
266}
267
268impl Default for PerSessionCache {
269	/// Creates a new `PerSessionCache` with a default capacity.
270	// TODO: Use the actual session window for allowed relay parents once
271	// https://github.com/paritytech/polkadot-sdk/issues/11207 is available,
272	// instead of DISPUTE_WINDOW.
273	fn default() -> Self {
274		Self::new(DISPUTE_WINDOW.get())
275	}
276}
277
278impl PerSessionCache {
279	/// Creates a new `PerSessionCache` with a given capacity.
280	fn new(capacity: u32) -> Self {
281		PerSessionCache {
282			validators_cache: LruMap::new(ByLength::new(capacity)),
283			node_features_cache: LruMap::new(ByLength::new(capacity)),
284			executor_params_cache: LruMap::new(ByLength::new(capacity)),
285			minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
286			validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
287		}
288	}
289
290	/// Gets validators from the cache or fetches them from the runtime if not present.
291	async fn validators(
292		&mut self,
293		session_index: SessionIndex,
294		parent: Hash,
295		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
296	) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
297		// Try to get the validators list from the cache.
298		if let Some(validators) = self.validators_cache.get(&session_index) {
299			return Ok(Arc::clone(validators));
300		}
301
302		// Fetch the validators list from the runtime since it was not in the cache.
303		let validators: Vec<ValidatorId> =
304			request_validators(parent, sender).await.await.map_err(|err| {
305				RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
306			})??;
307
308		// Wrap the validators list in an Arc to avoid a deep copy when storing it in the cache.
309		let validators = Arc::new(validators);
310
311		// Cache the fetched validators list for future use.
312		self.validators_cache.insert(session_index, Arc::clone(&validators));
313
314		Ok(validators)
315	}
316
317	/// Gets the node features from the cache or fetches it from the runtime if not present.
318	async fn node_features(
319		&mut self,
320		session_index: SessionIndex,
321		parent: Hash,
322		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
323	) -> Result<NodeFeatures, RuntimeApiError> {
324		// Try to get the node features from the cache.
325		if let Some(node_features) = self.node_features_cache.get(&session_index) {
326			return Ok(node_features.clone());
327		}
328
329		// Fetch the node features from the runtime since it was not in the cache.
330		let node_features = request_node_features(parent, session_index, sender)
331			.await
332			.await
333			.map_err(|err| RuntimeApiError::Execution {
334				runtime_api_name: "NodeFeatures",
335				source: Arc::new(err),
336			})??;
337
338		// Cache the fetched node features for future use.
339		self.node_features_cache.insert(session_index, node_features.clone());
340
341		Ok(node_features)
342	}
343
344	/// Gets the executor parameters from the cache or
345	/// fetches them from the runtime if not present.
346	async fn executor_params(
347		&mut self,
348		session_index: SessionIndex,
349		parent: Hash,
350		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
351	) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
352		// Try to get the executor parameters from the cache.
353		if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
354			return Ok(Arc::clone(executor_params));
355		}
356
357		// Fetch the executor parameters from the runtime since it was not in the cache.
358		let executor_params = request_session_executor_params(parent, session_index, sender)
359			.await
360			.await
361			.map_err(|err| RuntimeApiError::Execution {
362				runtime_api_name: "SessionExecutorParams",
363				source: Arc::new(err),
364			})??
365			.ok_or_else(|| RuntimeApiError::Execution {
366				runtime_api_name: "SessionExecutorParams",
367				source: Arc::new(Error::MissingExecutorParams),
368			})?;
369
370		// Wrap the executor parameters in an Arc to avoid a deep copy when storing it in the cache.
371		let executor_params = Arc::new(executor_params);
372
373		// Cache the fetched executor parameters for future use.
374		self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
375
376		Ok(executor_params)
377	}
378
379	/// Gets the minimum backing votes threshold from the
380	/// cache or fetches it from the runtime if not present.
381	async fn minimum_backing_votes(
382		&mut self,
383		session_index: SessionIndex,
384		parent: Hash,
385		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
386	) -> Result<u32, RuntimeApiError> {
387		// Try to get the value from the cache.
388		if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
389			return Ok(*minimum_backing_votes);
390		}
391
392		// Fetch the value from the runtime since it was not in the cache.
393		let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
394			.await
395			.await
396			.map_err(|err| RuntimeApiError::Execution {
397				runtime_api_name: "MinimumBackingVotes",
398				source: Arc::new(err),
399			})??;
400
401		// Cache the fetched value for future use.
402		self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
403
404		Ok(minimum_backing_votes)
405	}
406
407	/// Gets or computes the validator-to-group mapping for a session.
408	fn validator_to_group(
409		&mut self,
410		session_index: SessionIndex,
411		validators: &[ValidatorId],
412		validator_groups: &[Vec<ValidatorIndex>],
413	) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
414		let validator_to_group = self
415			.validator_to_group_cache
416			.get_or_insert(session_index, || {
417				let mut vector = vec![None; validators.len()];
418
419				for (group_idx, validator_group) in validator_groups.iter().enumerate() {
420					for validator in validator_group {
421						vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
422					}
423				}
424
425				Arc::new(IndexedVec::<_, _>::from(vector))
426			})
427			.expect("Just inserted");
428
429		Arc::clone(validator_to_group)
430	}
431}
432
433/// The state of the subsystem.
434struct State {
435	/// The utility for managing the implicit and explicit views in a consistent way.
436	implicit_view: ImplicitView,
437	/// State tracked for all scheduling-parents backing work is ongoing for. This includes
438	/// all active leaves.
439	per_scheduling_parent: HashMap<Hash, PerSchedulingParentState>,
440	/// State tracked for all candidates relevant to the implicit view.
441	///
442	/// This is guaranteed to have an entry for each candidate with a scheduling parent in the
443	/// implicit or explicit view for which a `Seconded` statement has been successfully imported.
444	per_candidate: HashMap<CandidateHash, PerCandidateState>,
445	/// A local cache for storing per-session data. This cache helps to
446	/// reduce repeated calls to the runtime and avoid redundant computations.
447	per_session_cache: PerSessionCache,
448	/// A clonable sender which is dispatched to background candidate validation tasks to inform
449	/// the main task of the result.
450	background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
451	/// The handle to the keystore used for signing.
452	keystore: KeystorePtr,
453	/// Monotonic flag: set to `true` once a **finalized** block is observed whose
454	/// session has the `CandidateReceiptV3` node feature enabled. Once set, never
455	/// unset. Used for V3 gating checks in backing — if V3 was never seen, reject
456	/// V3 candidates and candidates where old/new version detection disagrees.
457	///
458	/// Backing uses finalized blocks (rather than any active leaf) to ensure that
459	/// new backers do not start producing V3 candidates before a supermajority of
460	/// validators (including approval checkers) are aware of the feature. Approval
461	/// and dispute validation use active leaves instead, so they always transition
462	/// *before* backing does — the safe direction.
463	v3_ever_seen: bool,
464}
465
466impl State {
467	fn new(
468		background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
469		keystore: KeystorePtr,
470	) -> Self {
471		State {
472			implicit_view: ImplicitView::default(),
473			per_scheduling_parent: HashMap::default(),
474			per_candidate: HashMap::new(),
475			per_session_cache: PerSessionCache::default(),
476			background_validation_tx,
477			keystore,
478			v3_ever_seen: false,
479		}
480	}
481}
482
483#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
484async fn run<Context>(
485	mut ctx: Context,
486	keystore: KeystorePtr,
487	metrics: Metrics,
488) -> FatalResult<()> {
489	let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
490	let mut state = State::new(background_validation_tx, keystore);
491
492	loop {
493		let res =
494			run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
495
496		match res {
497			Ok(()) => break,
498			Err(e) => crate::error::log_error(Err(e))?,
499		}
500	}
501
502	Ok(())
503}
504
505#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
506async fn run_iteration<Context>(
507	ctx: &mut Context,
508	state: &mut State,
509	metrics: &Metrics,
510	background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
511) -> Result<(), Error> {
512	loop {
513		futures::select!(
514			validated_command = background_validation_rx.next().fuse() => {
515				if let Some((scheduling_parent, command)) = validated_command {
516					handle_validated_candidate_command(
517						&mut *ctx,
518						state,
519						scheduling_parent,
520						command,
521						metrics,
522					).await?;
523				} else {
524					panic!("background_validation_tx always alive at this point; qed");
525				}
526			}
527			from_overseer = ctx.recv().fuse() => {
528				match from_overseer.map_err(Error::OverseerExited)? {
529					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
530						handle_active_leaves_update(
531							&mut *ctx,
532							update,
533							state,
534						).await?;
535					}
536					FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _number)) => {
537						if !state.v3_ever_seen {
538							check_v3_on_finalized(&mut *ctx, state, hash).await?;
539						}
540					}
541					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
542					FromOrchestra::Communication { msg } => {
543						handle_communication(&mut *ctx, state, msg, metrics).await?;
544					}
545				}
546			}
547		)
548	}
549}
550
551/// In case a backing validator does not provide a PoV, we need to retry with other backing
552/// validators.
553///
554/// This is the data needed to accomplish this. Basically all the data needed for spawning a
555/// validation job and a list of backing validators, we can try.
556#[derive(Clone)]
557struct AttestingData {
558	/// The candidate to attest.
559	candidate: CandidateReceipt,
560	/// Hash of the PoV we need to fetch.
561	pov_hash: Hash,
562	/// Validator we are currently trying to get the PoV from.
563	from_validator: ValidatorIndex,
564	/// Other backing validators we can try in case `from_validator` failed.
565	backing: Vec<ValidatorIndex>,
566}
567
568#[derive(Default, Debug)]
569struct TableContext {
570	validator: Option<Validator>,
571	groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
572	validators: Vec<ValidatorId>,
573	disabled_validators: Vec<ValidatorIndex>,
574}
575
576impl TableContext {
577	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
578	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
579		self.disabled_validators
580			.iter()
581			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
582	}
583
584	// Returns `true` if the local validator is in the disabled validators list
585	pub fn local_validator_is_disabled(&self) -> Option<bool> {
586		self.validator.as_ref().map(|v| v.disabled())
587	}
588}
589
590impl TableContextTrait for TableContext {
591	type AuthorityId = ValidatorIndex;
592	type Digest = CandidateHash;
593	type GroupId = CoreIndex;
594	type Signature = ValidatorSignature;
595	type Candidate = CommittedCandidateReceipt;
596
597	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
598		candidate.hash()
599	}
600
601	fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
602		self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
603	}
604
605	fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
606		self.groups.get(group).map(|g| g.len())
607	}
608}
609
610// It looks like it's not possible to do an `impl From` given the current state of
611// the code. So this does the necessary conversion.
612fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
613	let statement = match s.payload() {
614		StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
615		StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
616	};
617
618	TableSignedStatement {
619		statement,
620		signature: s.signature().clone(),
621		sender: s.validator_index(),
622	}
623}
624
625fn table_attested_to_backed(
626	attested: TableAttestedCandidate<
627		CoreIndex,
628		CommittedCandidateReceipt,
629		ValidatorIndex,
630		ValidatorSignature,
631	>,
632	table_context: &TableContext,
633) -> Option<BackedCandidate> {
634	let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
635
636	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
637		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
638
639	let group = table_context.groups.get(&core_index)?;
640
641	let mut validator_indices = BitVec::with_capacity(group.len());
642
643	validator_indices.resize(group.len(), false);
644
645	// The order of the validity votes in the backed candidate must match
646	// the order of bits set in the bitfield, which is not necessarily
647	// the order of the `validity_votes` we got from the table.
648	let mut vote_positions = Vec::with_capacity(validity_votes.len());
649	for (orig_idx, id) in ids.iter().enumerate() {
650		if let Some(position) = group.iter().position(|x| x == id) {
651			validator_indices.set(position, true);
652			vote_positions.push((orig_idx, position));
653		} else {
654			gum::warn!(
655				target: LOG_TARGET,
656				"Logic error: Validity vote from table does not correspond to group",
657			);
658
659			return None;
660		}
661	}
662	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
663
664	Some(BackedCandidate::new(
665		candidate,
666		vote_positions
667			.into_iter()
668			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
669			.collect(),
670		validator_indices,
671		core_index,
672	))
673}
674
675async fn store_available_data(
676	sender: &mut impl overseer::CandidateBackingSenderTrait,
677	n_validators: u32,
678	candidate_hash: CandidateHash,
679	available_data: AvailableData,
680	expected_erasure_root: Hash,
681	core_index: CoreIndex,
682	node_features: NodeFeatures,
683) -> Result<(), Error> {
684	let (tx, rx) = oneshot::channel();
685	// Important: the `av-store` subsystem will check if the erasure root of the `available_data`
686	// matches `expected_erasure_root` which was provided by the collator in the `CandidateReceipt`.
687	// This check is consensus critical and the `backing` subsystem relies on it for ensuring
688	// candidate validity.
689	sender
690		.send_message(AvailabilityStoreMessage::StoreAvailableData {
691			candidate_hash,
692			n_validators,
693			available_data,
694			expected_erasure_root,
695			core_index,
696			node_features,
697			tx,
698		})
699		.await;
700
701	rx.await
702		.map_err(Error::StoreAvailableDataChannel)?
703		.map_err(Error::StoreAvailableData)
704}
705
706// Make a `PoV` available.
707//
708// This calls the AV store to write the available data to storage. The AV store also checks the
709// erasure root matches the `expected_erasure_root`.
710// This returns `Err()` on erasure root mismatch or due to any AV store subsystem error.
711//
712// Otherwise, it returns `Ok(())`.
713async fn make_pov_available(
714	sender: &mut impl overseer::CandidateBackingSenderTrait,
715	n_validators: usize,
716	pov: Arc<PoV>,
717	candidate_hash: CandidateHash,
718	validation_data: PersistedValidationData,
719	expected_erasure_root: Hash,
720	core_index: CoreIndex,
721	node_features: NodeFeatures,
722) -> Result<(), Error> {
723	store_available_data(
724		sender,
725		n_validators as u32,
726		candidate_hash,
727		AvailableData { pov, validation_data },
728		expected_erasure_root,
729		core_index,
730		node_features,
731	)
732	.await
733}
734
735async fn request_pov(
736	sender: &mut impl overseer::CandidateBackingSenderTrait,
737	relay_parent: Hash,
738	from_validator: ValidatorIndex,
739	para_id: ParaId,
740	candidate_hash: CandidateHash,
741	pov_hash: Hash,
742) -> Result<Arc<PoV>, Error> {
743	let (tx, rx) = oneshot::channel();
744	sender
745		.send_message(AvailabilityDistributionMessage::FetchPoV {
746			relay_parent,
747			from_validator,
748			para_id,
749			candidate_hash,
750			pov_hash,
751			tx,
752		})
753		.await;
754
755	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
756	Ok(Arc::new(pov))
757}
758
759async fn request_candidate_validation(
760	sender: &mut impl overseer::CandidateBackingSenderTrait,
761	validation_data: PersistedValidationData,
762	validation_code: ValidationCode,
763	candidate_receipt: CandidateReceipt,
764	pov: Arc<PoV>,
765	executor_params: ExecutorParams,
766) -> Result<ValidationResult, Error> {
767	let (tx, rx) = oneshot::channel();
768	let is_system = candidate_receipt.descriptor.para_id().is_system();
769	// PVF execution uses this for pruning obsolete jobs - we need the scheduling parent here:
770	let scheduling_parent = candidate_receipt.descriptor.scheduling_parent();
771
772	sender
773		.send_message(CandidateValidationMessage::ValidateFromExhaustive {
774			validation_data,
775			validation_code,
776			candidate_receipt,
777			pov,
778			executor_params,
779			exec_kind: if is_system {
780				PvfExecKind::BackingSystemParas(scheduling_parent)
781			} else {
782				PvfExecKind::Backing(scheduling_parent)
783			},
784			response_sender: tx,
785		})
786		.await;
787
788	match rx.await {
789		Ok(Ok(validation_result)) => Ok(validation_result),
790		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
791		Err(err) => Err(Error::ValidateFromExhaustive(err)),
792	}
793}
794
795struct BackgroundValidationOutputs {
796	candidate: CandidateReceipt,
797	commitments: CandidateCommitments,
798	persisted_validation_data: PersistedValidationData,
799}
800
801type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
802
803struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
804	sender: S,
805	tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
806	candidate: CandidateReceipt,
807	/// The scheduling parent hash. Used as context for runtime API queries and as
808	/// the key for `per_scheduling_parent` lookup when sending results back.
809	/// For V1/V2, this equals the candidate's relay_parent.
810	scheduling_parent: Hash,
811	node_features: NodeFeatures,
812	executor_params: Arc<ExecutorParams>,
813	persisted_validation_data: PersistedValidationData,
814	pov: PoVData,
815	n_validators: usize,
816	make_command: F,
817}
818
819async fn validate_and_make_available(
820	params: BackgroundValidationParams<
821		impl overseer::CandidateBackingSenderTrait,
822		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
823	>,
824	core_index: CoreIndex,
825) -> Result<(), Error> {
826	let BackgroundValidationParams {
827		mut sender,
828		mut tx_command,
829		candidate,
830		scheduling_parent,
831		node_features,
832		executor_params,
833		persisted_validation_data,
834		pov,
835		n_validators,
836		make_command,
837	} = params;
838
839	let validation_code = {
840		let validation_code_hash = candidate.descriptor().validation_code_hash();
841		let (tx, rx) = oneshot::channel();
842		sender
843			.send_message(RuntimeApiMessage::Request(
844				scheduling_parent,
845				RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
846			))
847			.await;
848
849		let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
850		match code {
851			Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
852			Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
853			Ok(Some(c)) => c,
854		}
855	};
856
857	let pov = match pov {
858		PoVData::Ready(pov) => pov,
859		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
860			match request_pov(
861				&mut sender,
862				scheduling_parent,
863				from_validator,
864				candidate.descriptor.para_id(),
865				candidate_hash,
866				pov_hash,
867			)
868			.await
869			{
870				Err(Error::FetchPoV) => {
871					tx_command
872						.send((
873							scheduling_parent,
874							ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
875						))
876						.await
877						.map_err(Error::BackgroundValidationMpsc)?;
878					return Ok(());
879				},
880				Err(err) => return Err(err),
881				Ok(pov) => pov,
882			}
883		},
884	};
885
886	let v = {
887		request_candidate_validation(
888			&mut sender,
889			persisted_validation_data,
890			validation_code,
891			candidate.clone(),
892			pov.clone(),
893			executor_params.as_ref().clone(),
894		)
895		.await?
896	};
897
898	let res = match v {
899		ValidationResult::Valid(commitments, validation_data) => {
900			gum::debug!(
901				target: LOG_TARGET,
902				candidate_hash = ?candidate.hash(),
903				"Validation successful",
904			);
905
906			let erasure_valid = make_pov_available(
907				&mut sender,
908				n_validators,
909				pov.clone(),
910				candidate.hash(),
911				validation_data.clone(),
912				candidate.descriptor.erasure_root(),
913				core_index,
914				node_features,
915			)
916			.await;
917
918			match erasure_valid {
919				Ok(()) => Ok(BackgroundValidationOutputs {
920					candidate,
921					commitments,
922					persisted_validation_data: validation_data,
923				}),
924				Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
925					gum::debug!(
926						target: LOG_TARGET,
927						candidate_hash = ?candidate.hash(),
928						actual_commitments = ?commitments,
929						"Erasure root doesn't match the announced by the candidate receipt",
930					);
931					Err(candidate)
932				},
933				// Bubble up any other error.
934				Err(e) => return Err(e),
935			}
936		},
937		ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
938			// If validation produces a new set of commitments, we vote the candidate as invalid.
939			gum::warn!(
940				target: LOG_TARGET,
941				candidate_hash = ?candidate.hash(),
942				"Validation yielded different commitments",
943			);
944			Err(candidate)
945		},
946		ValidationResult::Invalid(reason) => {
947			gum::warn!(
948				target: LOG_TARGET,
949				candidate_hash = ?candidate.hash(),
950				reason = ?reason,
951				"Validation yielded an invalid candidate",
952			);
953			Err(candidate)
954		},
955	};
956
957	tx_command
958		.send((scheduling_parent, make_command(res)))
959		.await
960		.map_err(Into::into)
961}
962
963#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
964async fn handle_communication<Context>(
965	ctx: &mut Context,
966	state: &mut State,
967	message: CandidateBackingMessage,
968	metrics: &Metrics,
969) -> Result<(), Error> {
970	match message {
971		CandidateBackingMessage::Second { scheduling_parent: _, candidate, pvd, pov } => {
972			handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
973		},
974		CandidateBackingMessage::Statement { scheduling_parent, statement } => {
975			handle_statement_message(ctx, state, scheduling_parent, statement, metrics).await?;
976		},
977		CandidateBackingMessage::GetBackableCandidates { candidates, sender } => {
978			handle_get_backable_candidates_message(state, candidates, sender, metrics)?
979		},
980		CandidateBackingMessage::CanSecond(request, tx) => {
981			handle_can_second_request(ctx, state, request, tx).await
982		},
983	}
984
985	Ok(())
986}
987
988#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
989async fn handle_active_leaves_update<Context>(
990	ctx: &mut Context,
991	update: ActiveLeavesUpdate,
992	state: &mut State,
993) -> Result<(), Error> {
994	// Activate in implicit view before deactivate, per the docs
995	// on ImplicitView, this is more efficient.
996	let res = if let Some(leaf) = update.activated {
997		let leaf_hash = leaf.hash;
998		Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
999	} else {
1000		None
1001	};
1002
1003	for deactivated in update.deactivated {
1004		state.implicit_view.deactivate_leaf(deactivated);
1005	}
1006
1007	// clean up `per_scheduling_parent` according to ancestry
1008	// of leaves. we do this so we can clean up candidates right after
1009	// as a result.
1010	{
1011		let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
1012
1013		state.per_scheduling_parent.retain(|r, _| remaining.contains(&r));
1014	}
1015
1016	// clean up `per_candidate` according to which relay-parents
1017	// are known.
1018	state
1019		.per_candidate
1020		.retain(|_, pc| state.per_scheduling_parent.contains_key(&pc.scheduling_parent));
1021
1022	// Get relay parents which might be fresh but might be known already
1023	// that are explicit or implicit from the new active leaf.
1024	let fresh_relay_parents = match res {
1025		None => return Ok(()),
1026		Some((leaf, Ok(_))) => {
1027			let fresh_relay_parents =
1028				state.implicit_view.known_allowed_relay_parents_under(&leaf.hash);
1029
1030			let fresh_relay_parent = match fresh_relay_parents {
1031				Some(f) => f.to_vec(),
1032				None => {
1033					gum::warn!(
1034						target: LOG_TARGET,
1035						leaf_hash = ?leaf.hash,
1036						"Implicit view gave no relay-parents"
1037					);
1038
1039					vec![leaf.hash]
1040				},
1041			};
1042			fresh_relay_parent
1043		},
1044		Some((leaf, Err(e))) => {
1045			gum::debug!(
1046				target: LOG_TARGET,
1047				leaf_hash = ?leaf.hash,
1048				err = ?e,
1049				"Failed to load implicit view for leaf."
1050			);
1051
1052			return Ok(());
1053		},
1054	};
1055
1056	// add entries in `per_scheduling_parent`. for all new relay-parents.
1057	for maybe_new in fresh_relay_parents {
1058		if state.per_scheduling_parent.contains_key(&maybe_new) {
1059			continue;
1060		}
1061
1062		// construct a `PerSchedulingParent` from the runtime API
1063		// and insert it.
1064		let per = construct_per_scheduling_parent_state(
1065			ctx,
1066			maybe_new,
1067			&state.keystore,
1068			&mut state.per_session_cache,
1069		)
1070		.await?;
1071
1072		if let Some(per) = per {
1073			state.per_scheduling_parent.insert(maybe_new, per);
1074		}
1075	}
1076
1077	Ok(())
1078}
1079
1080/// Check whether the `CandidateReceiptV3` node feature is enabled at the session
1081/// of the given finalized block. Sets `state.v3_ever_seen` if so.
1082#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1083async fn check_v3_on_finalized<Context>(
1084	ctx: &mut Context,
1085	state: &mut State,
1086	finalized_hash: Hash,
1087) -> Result<(), Error> {
1088	let session_index = request_session_index_for_child(finalized_hash, ctx.sender())
1089		.await
1090		.await
1091		.map_err(runtime::Error::from)?
1092		.map_err(runtime::Error::from)?;
1093
1094	let node_features = state
1095		.per_session_cache
1096		.node_features(session_index, finalized_hash, ctx.sender())
1097		.await
1098		.map_err(runtime::Error::from)?;
1099
1100	if FeatureIndex::CandidateReceiptV3.is_set(&node_features) {
1101		gum::info!(
1102			target: LOG_TARGET,
1103			?session_index,
1104			"CandidateReceiptV3 node feature detected in finalized block, \
1105		 enabling V3 candidate support",
1106		);
1107		state.v3_ever_seen = true;
1108	}
1109
1110	Ok(())
1111}
1112
1113macro_rules! try_runtime_api {
1114	($x: expr) => {
1115		match $x {
1116			Ok(x) => x,
1117			Err(err) => {
1118				// Only bubble up fatal errors.
1119				error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1120
1121				// We can't do candidate validation work if we don't have the
1122				// requisite runtime API data. But these errors should not take
1123				// down the node.
1124				return Ok(None);
1125			},
1126		}
1127	};
1128}
1129
1130fn core_index_from_statement(
1131	sp_state: &PerSchedulingParentState,
1132	statement: &SignedFullStatementWithPVD,
1133) -> Option<CoreIndex> {
1134	let compact_statement = statement.as_unchecked();
1135	let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1136
1137	gum::trace!(
1138		target: LOG_TARGET,
1139		group_rotation_info = ?sp_state.group_rotation_info,
1140		?statement,
1141		validator_to_group = ?sp_state.validator_to_group,
1142		n_cores = sp_state.n_cores,
1143		?candidate_hash,
1144		"Extracting core index from statement"
1145	);
1146
1147	let statement_validator_index = statement.validator_index();
1148	let Some(Some(group_index)) = sp_state.validator_to_group.get(statement_validator_index) else {
1149		gum::debug!(
1150			target: LOG_TARGET,
1151			group_rotation_info = ?sp_state.group_rotation_info,
1152			?statement,
1153			validator_to_group = ?sp_state.validator_to_group,
1154			n_cores = sp_state.n_cores,
1155			?candidate_hash,
1156			"Invalid validator index: {:?}",
1157			statement_validator_index
1158		);
1159		return None;
1160	};
1161
1162	// First check if the statement para id matches the core assignment.
1163	let core_index =
1164		sp_state.group_rotation_info.core_for_group(*group_index, sp_state.n_cores as _);
1165
1166	if core_index.0 > sp_state.n_cores {
1167		gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores = sp_state.n_cores, "Invalid CoreIndex");
1168		return None;
1169	}
1170
1171	if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1172		let candidate_para_id = candidate.descriptor.para_id();
1173		let mut assigned_paras = sp_state.claim_queue.iter_claims_for_core(&core_index);
1174
1175		if !assigned_paras.any(|id| id == &candidate_para_id) {
1176			gum::debug!(
1177				target: LOG_TARGET,
1178				?candidate_hash,
1179				?core_index,
1180				assigned_paras = ?sp_state.claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1181				?candidate_para_id,
1182				"Invalid CoreIndex, core is not assigned to this para_id"
1183			);
1184			return None;
1185		}
1186		Some(core_index)
1187	} else {
1188		Some(core_index)
1189	}
1190}
1191
1192/// Load the data necessary to do backing work on top of a relay-parent.
1193#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1194async fn construct_per_scheduling_parent_state<Context>(
1195	ctx: &mut Context,
1196	relay_parent: Hash,
1197	keystore: &KeystorePtr,
1198	per_session_cache: &mut PerSessionCache,
1199) -> Result<Option<PerSchedulingParentState>, Error> {
1200	let parent = relay_parent;
1201
1202	let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1203		request_session_index_for_child(parent, ctx.sender()).await,
1204		request_validator_groups(parent, ctx.sender()).await,
1205		request_claim_queue(parent, ctx.sender()).await,
1206		request_disabled_validators(parent, ctx.sender()).await,
1207	)
1208	.map_err(Error::JoinMultiple)?;
1209
1210	let session_index = try_runtime_api!(session_index);
1211
1212	let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1213	let validators = try_runtime_api!(validators);
1214
1215	let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1216	let node_features = try_runtime_api!(node_features);
1217
1218	gum::debug!(target: LOG_TARGET, ?parent, "New state");
1219
1220	let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1221
1222	let minimum_backing_votes = per_session_cache
1223		.minimum_backing_votes(session_index, parent, ctx.sender())
1224		.await;
1225	let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1226	let claim_queue = try_runtime_api!(claim_queue);
1227	let disabled_validators = try_runtime_api!(disabled_validators);
1228
1229	let signing_context = SigningContext { parent_hash: parent, session_index };
1230	let validator = match Validator::construct(
1231		&validators,
1232		&disabled_validators,
1233		signing_context.clone(),
1234		keystore.clone(),
1235	) {
1236		Ok(v) => Some(v),
1237		Err(util::Error::NotAValidator) => None,
1238		Err(e) => {
1239			gum::warn!(
1240				target: LOG_TARGET,
1241				err = ?e,
1242				"Cannot participate in candidate backing",
1243			);
1244
1245			return Ok(None);
1246		},
1247	};
1248
1249	let n_cores = validator_groups.len();
1250
1251	let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1252	let mut assigned_core = None;
1253
1254	for idx in 0..n_cores {
1255		let core_index = CoreIndex(idx as _);
1256
1257		if !claim_queue.contains_key(&core_index) {
1258			continue;
1259		}
1260
1261		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1262		if let Some(g) = validator_groups.get(group_index.0 as usize) {
1263			if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1264				assigned_core = Some(core_index);
1265			}
1266			groups.insert(core_index, g.clone());
1267		}
1268	}
1269	gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1270
1271	let validator_to_group =
1272		per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1273
1274	let table_context =
1275		TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1276
1277	Ok(Some(PerSchedulingParentState {
1278		parent,
1279		node_features,
1280		assigned_core,
1281		backed: HashSet::new(),
1282		table: Table::new(),
1283		table_context,
1284		issued_statements: HashSet::new(),
1285		awaiting_validation: HashSet::new(),
1286		fallbacks: HashMap::new(),
1287		minimum_backing_votes,
1288		n_cores: validator_groups.len() as u32,
1289		claim_queue: ClaimQueueSnapshot::from(claim_queue),
1290		validator_to_group,
1291		session_index,
1292		group_rotation_info,
1293	}))
1294}
1295
1296enum SecondingAllowed {
1297	No,
1298	// On which leaves is seconding allowed.
1299	Yes(Vec<Hash>),
1300}
1301
1302/// Checks whether a candidate can be seconded based on its hypothetical membership in the fragment
1303/// chain.
1304#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1305async fn seconding_sanity_check<Context>(
1306	ctx: &mut Context,
1307	implicit_view: &ImplicitView,
1308	hypothetical_candidate: HypotheticalCandidate,
1309) -> SecondingAllowed {
1310	let mut leaves_for_seconding = Vec::new();
1311	let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1312
1313	// Scheduling context: the scheduling parent determines which leaves this candidate
1314	// can be seconded under (it must be in the allowed ancestry).
1315	let candidate_scheduling_parent = hypothetical_candidate.scheduling_parent();
1316	let candidate_hash = hypothetical_candidate.candidate_hash();
1317
1318	for head in implicit_view.leaves() {
1319		// Check that the candidate scheduling parent is allowed under this leaf.
1320		let allowed_parents_for_para = implicit_view.known_allowed_relay_parents_under(head);
1321		if !allowed_parents_for_para
1322			.unwrap_or_default()
1323			.contains(&candidate_scheduling_parent)
1324		{
1325			continue;
1326		}
1327
1328		let (tx, rx) = oneshot::channel();
1329		ctx.send_message(ProspectiveParachainsMessage::GetHypotheticalMembership(
1330			HypotheticalMembershipRequest {
1331				candidates: vec![hypothetical_candidate.clone()],
1332				fragment_chain_relay_parent: Some(*head),
1333			},
1334			tx,
1335		))
1336		.await;
1337		let response = rx.map_ok(move |candidate_memberships| {
1338			let is_member_or_potential = candidate_memberships
1339				.into_iter()
1340				.find_map(|(candidate, leaves)| {
1341					(candidate.candidate_hash() == candidate_hash).then_some(leaves)
1342				})
1343				.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1344				.is_some();
1345
1346			(is_member_or_potential, head)
1347		});
1348		responses.push_back(response.boxed());
1349	}
1350
1351	if responses.is_empty() {
1352		return SecondingAllowed::No;
1353	}
1354
1355	while let Some(response) = responses.next().await {
1356		match response {
1357			Err(oneshot::Canceled) => {
1358				gum::warn!(
1359					target: LOG_TARGET,
1360					"Failed to reach prospective parachains subsystem for hypothetical membership",
1361				);
1362
1363				return SecondingAllowed::No;
1364			},
1365			Ok((is_member_or_potential, head)) => match is_member_or_potential {
1366				false => {
1367					gum::debug!(
1368						target: LOG_TARGET,
1369						?candidate_hash,
1370						leaf_hash = ?head,
1371						"Refusing to second candidate at leaf. Is not a potential member.",
1372					);
1373				},
1374				true => {
1375					leaves_for_seconding.push(*head);
1376				},
1377			},
1378		}
1379	}
1380
1381	if leaves_for_seconding.is_empty() {
1382		SecondingAllowed::No
1383	} else {
1384		SecondingAllowed::Yes(leaves_for_seconding)
1385	}
1386}
1387
1388/// Performs seconding sanity check for an advertisement.
1389#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1390async fn handle_can_second_request<Context>(
1391	ctx: &mut Context,
1392	state: &State,
1393	request: CanSecondRequest,
1394	tx: oneshot::Sender<bool>,
1395) {
1396	let scheduling_parent = request.candidate_scheduling_parent;
1397	let response = if state.per_scheduling_parent.get(&scheduling_parent).is_some() {
1398		let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1399			candidate_hash: request.candidate_hash,
1400			candidate_para: request.candidate_para_id,
1401			parent_head_data_hash: request.parent_head_data_hash,
1402			candidate_scheduling_parent: scheduling_parent,
1403		};
1404
1405		let result =
1406			seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1407
1408		match result {
1409			SecondingAllowed::No => false,
1410			SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1411		}
1412	} else {
1413		// Relay parent is unknown or async backing is disabled.
1414		false
1415	};
1416
1417	let _ = tx.send(response);
1418}
1419
1420/// Determine the session for executor_params lookup and fetch executor_params.
1421///
1422/// For V2/V3, session_index is in the descriptor. For V1, scheduling_parent ==
1423/// relay_parent, so `sp_state.session_index` is the relay_parent's session.
1424///
1425/// Note: We use the scheduling parent (`sp_state.parent`) rather than the relay parent for
1426/// the runtime API fetch. While the relay parent is the relevant parent for the execution
1427/// environment, we only need the session index here (which is already determined). Using
1428/// the relay parent would fail for old relay parents whose state may have been pruned. The
1429/// scheduling parent is guaranteed to never be older than the relay parent and is always a
1430/// recent relay chain block, making it safe for fetching session-indexed data.
1431async fn get_executor_params(
1432	per_session_cache: &mut PerSessionCache,
1433	descriptor: &CandidateDescriptorV2,
1434	sp_state: &PerSchedulingParentState,
1435	sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
1436) -> Result<Arc<ExecutorParams>, Error> {
1437	let session = descriptor.session_index().unwrap_or(sp_state.session_index);
1438	per_session_cache
1439		.executor_params(session, sp_state.parent, sender)
1440		.await
1441		.map_err(|e| Error::UtilError(UtilError::RuntimeApi(e)))
1442}
1443
1444#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1445async fn handle_validated_candidate_command<Context>(
1446	ctx: &mut Context,
1447	state: &mut State,
1448	scheduling_parent: Hash,
1449	command: ValidatedCandidateCommand,
1450	metrics: &Metrics,
1451) -> Result<(), Error> {
1452	match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1453		Some(sp_state) => {
1454			let candidate_hash = command.candidate_hash();
1455			sp_state.awaiting_validation.remove(&candidate_hash);
1456
1457			match command {
1458				ValidatedCandidateCommand::Second(res) => match res {
1459					Ok(outputs) => {
1460						let BackgroundValidationOutputs {
1461							candidate,
1462							commitments,
1463							persisted_validation_data,
1464						} = outputs;
1465
1466						if sp_state.issued_statements.contains(&candidate_hash) {
1467							return Ok(());
1468						}
1469
1470						let receipt = CommittedCandidateReceipt {
1471							descriptor: candidate.descriptor.clone(),
1472							commitments,
1473						};
1474
1475						let hypothetical_candidate = HypotheticalCandidate::Complete {
1476							candidate_hash,
1477							receipt: Arc::new(receipt.clone()),
1478							persisted_validation_data: persisted_validation_data.clone(),
1479						};
1480						// sanity check that we're allowed to second the candidate
1481						// and that it doesn't conflict with other candidates we've
1482						// seconded.
1483						if let SecondingAllowed::No = seconding_sanity_check(
1484							ctx,
1485							&state.implicit_view,
1486							hypothetical_candidate,
1487						)
1488						.await
1489						{
1490							return Ok(());
1491						};
1492
1493						let statement =
1494							StatementWithPVD::Seconded(receipt, persisted_validation_data);
1495
1496						// If we get an Error::RejectedByProspectiveParachains,
1497						// then the statement has not been distributed or imported into
1498						// the table.
1499						let res = sign_import_and_distribute_statement(
1500							ctx,
1501							sp_state,
1502							&mut state.per_candidate,
1503							statement,
1504							state.keystore.clone(),
1505							metrics,
1506						)
1507						.await;
1508
1509						if let Err(Error::RejectedByProspectiveParachains) = res {
1510							let candidate_hash = candidate.hash();
1511							gum::debug!(
1512								target: LOG_TARGET,
1513								scheduling_parent = ?candidate.descriptor().scheduling_parent(),
1514								?candidate_hash,
1515								"Attempted to second candidate but was rejected by prospective parachains",
1516							);
1517
1518							ctx.send_message(CollatorProtocolMessage::Invalid(
1519								candidate.descriptor().scheduling_parent(),
1520								candidate,
1521							))
1522							.await;
1523
1524							return Ok(());
1525						}
1526
1527						if let Some(stmt) = res? {
1528							match state.per_candidate.get_mut(&candidate_hash) {
1529								None => {
1530									gum::warn!(
1531										target: LOG_TARGET,
1532										?candidate_hash,
1533										"Missing `per_candidate` for seconded candidate.",
1534									);
1535								},
1536								Some(p) => p.seconded_locally = true,
1537							}
1538
1539							sp_state.issued_statements.insert(candidate_hash);
1540
1541							metrics.on_candidate_seconded();
1542							ctx.send_message(CollatorProtocolMessage::Seconded(
1543								sp_state.parent,
1544								StatementWithPVD::drop_pvd_from_signed(stmt),
1545							))
1546							.await;
1547						}
1548					},
1549					Err(candidate) => {
1550						ctx.send_message(CollatorProtocolMessage::Invalid(
1551							sp_state.parent,
1552							candidate,
1553						))
1554						.await;
1555					},
1556				},
1557				ValidatedCandidateCommand::Attest(res) => {
1558					// We are done - avoid new validation spawns:
1559					sp_state.fallbacks.remove(&candidate_hash);
1560					// sanity check.
1561					if !sp_state.issued_statements.contains(&candidate_hash) {
1562						if res.is_ok() {
1563							let statement = StatementWithPVD::Valid(candidate_hash);
1564
1565							sign_import_and_distribute_statement(
1566								ctx,
1567								sp_state,
1568								&mut state.per_candidate,
1569								statement,
1570								state.keystore.clone(),
1571								metrics,
1572							)
1573							.await?;
1574						}
1575						sp_state.issued_statements.insert(candidate_hash);
1576					}
1577				},
1578				ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1579					if let Some(attesting) = sp_state.fallbacks.get_mut(&candidate_hash) {
1580						if let Some(index) = attesting.backing.pop() {
1581							attesting.from_validator = index;
1582							let attesting = attesting.clone();
1583
1584							// The candidate state should be available because we've
1585							// validated it before, the relay-parent is still around,
1586							// and candidates are pruned on the basis of relay-parents.
1587							//
1588							// If it is not, then no point in validating it anyway.
1589							if let Some(pvd) = state
1590								.per_candidate
1591								.get(&candidate_hash)
1592								.map(|pc| pc.persisted_validation_data.clone())
1593							{
1594								let executor_params = get_executor_params(
1595									&mut state.per_session_cache,
1596									attesting.candidate.descriptor(),
1597									sp_state,
1598									ctx.sender(),
1599								)
1600								.await?;
1601
1602								kick_off_validation_work(
1603									ctx,
1604									sp_state,
1605									pvd,
1606									&state.background_validation_tx,
1607									attesting,
1608									executor_params,
1609								)
1610								.await?;
1611							}
1612						}
1613					} else {
1614						gum::warn!(
1615							target: LOG_TARGET,
1616							"AttestNoPoV was triggered without fallback being available."
1617						);
1618						debug_assert!(false);
1619					}
1620				},
1621			}
1622		},
1623		None => {
1624			// simple race condition; can be ignored = this relay-parent
1625			// is no longer relevant.
1626		},
1627	}
1628
1629	Ok(())
1630}
1631
1632fn sign_statement(
1633	sp_state: &PerSchedulingParentState,
1634	statement: StatementWithPVD,
1635	keystore: KeystorePtr,
1636	metrics: &Metrics,
1637) -> Option<SignedFullStatementWithPVD> {
1638	let signed = sp_state
1639		.table_context
1640		.validator
1641		.as_ref()?
1642		.sign(keystore, statement)
1643		.ok()
1644		.flatten()?;
1645	metrics.on_statement_signed();
1646	Some(signed)
1647}
1648
1649/// Import a statement into the statement table and return the summary of the import.
1650///
1651/// This will fail with `Error::RejectedByProspectiveParachains` if the message type is seconded,
1652/// the candidate is fresh, and any of the following are true:
1653/// 1. There is no `PersistedValidationData` attached.
1654/// 2. Prospective parachains subsystem returned an empty `HypotheticalMembership` i.e. did not
1655///    recognize the candidate as being applicable to any of the active leaves.
1656#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1657async fn import_statement<Context>(
1658	ctx: &mut Context,
1659	sp_state: &mut PerSchedulingParentState,
1660	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1661	statement: &SignedFullStatementWithPVD,
1662) -> Result<Option<TableSummary>, Error> {
1663	let candidate_hash = statement.payload().candidate_hash();
1664
1665	gum::debug!(
1666		target: LOG_TARGET,
1667		statement = ?statement.payload().to_compact(),
1668		validator_index = statement.validator_index().0,
1669		?candidate_hash,
1670		"Importing statement",
1671	);
1672
1673	// If this is a new candidate (statement is 'seconded' and candidate is unknown),
1674	// we need to create an entry in the `PerCandidateState` map.
1675	//
1676	// We also need to inform the prospective parachains subsystem of the seconded candidate.
1677	// If `ProspectiveParachainsMessage::Second` fails, then we return
1678	// Error::RejectedByProspectiveParachains.
1679	//
1680	// Persisted Validation Data should be available - it may already be available
1681	// if this is a candidate we are seconding.
1682	//
1683	// We should also not accept any candidates which have no valid depths under any of
1684	// our active leaves.
1685	if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1686		if !per_candidate.contains_key(&candidate_hash) {
1687			let (tx, rx) = oneshot::channel();
1688			ctx.send_message(ProspectiveParachainsMessage::IntroduceSecondedCandidate(
1689				IntroduceSecondedCandidateRequest {
1690					candidate_para: candidate.descriptor.para_id(),
1691					candidate_receipt: candidate.clone(),
1692					persisted_validation_data: pvd.clone(),
1693				},
1694				tx,
1695			))
1696			.await;
1697
1698			match rx.await {
1699				Err(oneshot::Canceled) => {
1700					gum::warn!(
1701						target: LOG_TARGET,
1702						"Could not reach the Prospective Parachains subsystem."
1703					);
1704
1705					return Err(Error::RejectedByProspectiveParachains);
1706				},
1707				Ok(false) => return Err(Error::RejectedByProspectiveParachains),
1708				Ok(true) => {},
1709			}
1710
1711			// Only save the candidate if it was approved by prospective parachains.
1712			per_candidate.insert(
1713				candidate_hash,
1714				PerCandidateState {
1715					persisted_validation_data: pvd.clone(),
1716					// This is set after importing when seconding locally.
1717					seconded_locally: false,
1718					// Scheduling context: used for cleanup against per_scheduling_parent.
1719					scheduling_parent: candidate.descriptor.scheduling_parent(),
1720				},
1721			);
1722		}
1723	}
1724
1725	let stmt = primitive_statement_to_table(statement);
1726
1727	let core = core_index_from_statement(sp_state, statement).ok_or(Error::CoreIndexUnavailable)?;
1728
1729	Ok(sp_state.table.import_statement(&sp_state.table_context, core, stmt))
1730}
1731
1732/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
1733/// misbehaviors as a result of importing a statement.
1734#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1735async fn post_import_statement_actions<Context>(
1736	ctx: &mut Context,
1737	sp_state: &mut PerSchedulingParentState,
1738	summary: Option<&TableSummary>,
1739) {
1740	if let Some(attested) = summary.as_ref().and_then(|s| {
1741		sp_state.table.attested_candidate(
1742			&s.candidate,
1743			&sp_state.table_context,
1744			sp_state.minimum_backing_votes,
1745		)
1746	}) {
1747		let candidate_hash = attested.candidate.hash();
1748
1749		// `HashSet::insert` returns true if the thing wasn't in there already.
1750		if sp_state.backed.insert(candidate_hash) {
1751			if let Some(backed) = table_attested_to_backed(attested, &sp_state.table_context) {
1752				let para_id = backed.candidate().descriptor.para_id();
1753				gum::debug!(
1754					target: LOG_TARGET,
1755					candidate_hash = ?candidate_hash,
1756					scheduling_parent = ?sp_state.parent,
1757					%para_id,
1758					"Candidate backed",
1759				);
1760
1761				// Inform the prospective parachains subsystem
1762				// that the candidate is now backed.
1763				ctx.send_message(ProspectiveParachainsMessage::CandidateBacked(
1764					para_id,
1765					candidate_hash,
1766				))
1767				.await;
1768				// Notify statement distribution of backed candidate.
1769				ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1770			} else {
1771				gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1772			}
1773		} else {
1774			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1775		}
1776	} else {
1777		gum::debug!(target: LOG_TARGET, "No attested candidate");
1778	}
1779
1780	issue_new_misbehaviors(ctx, sp_state.parent, &mut sp_state.table);
1781}
1782
1783/// Check if there have happened any new misbehaviors and issue necessary messages.
1784#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1785fn issue_new_misbehaviors<Context>(
1786	ctx: &mut Context,
1787	relay_parent: Hash,
1788	table: &mut Table<TableContext>,
1789) {
1790	// collect the misbehaviors to avoid double mutable self borrow issues
1791	let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1792	for (validator_id, report) in misbehaviors {
1793		// The provisioner waits on candidate-backing, which means
1794		// that we need to send unbounded messages to avoid cycles.
1795		//
1796		// Misbehaviors are bounded by the number of validators and
1797		// the block production protocol.
1798		ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1799			relay_parent,
1800			ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1801		));
1802	}
1803}
1804
1805/// Sign, import, and distribute a statement.
1806#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1807async fn sign_import_and_distribute_statement<Context>(
1808	ctx: &mut Context,
1809	sp_state: &mut PerSchedulingParentState,
1810	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1811	statement: StatementWithPVD,
1812	keystore: KeystorePtr,
1813	metrics: &Metrics,
1814) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1815	if let Some(signed_statement) = sign_statement(&*sp_state, statement, keystore, metrics) {
1816		let summary = import_statement(ctx, sp_state, per_candidate, &signed_statement).await?;
1817
1818		// `Share` must always be sent before `Backed`. We send the latter in
1819		// `post_import_statement_action` below.
1820		let smsg = StatementDistributionMessage::Share(sp_state.parent, signed_statement.clone());
1821		ctx.send_unbounded_message(smsg);
1822
1823		post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1824
1825		Ok(Some(signed_statement))
1826	} else {
1827		Ok(None)
1828	}
1829}
1830
1831#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1832async fn background_validate_and_make_available<Context>(
1833	ctx: &mut Context,
1834	sp_state: &mut PerSchedulingParentState,
1835	params: BackgroundValidationParams<
1836		impl overseer::CandidateBackingSenderTrait,
1837		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1838	>,
1839) -> Result<(), Error> {
1840	let candidate_hash = params.candidate.hash();
1841	let Some(core_index) = sp_state.assigned_core else { return Ok(()) };
1842	if sp_state.awaiting_validation.insert(candidate_hash) {
1843		// spawn background task.
1844		let bg = async move {
1845			if let Err(error) = validate_and_make_available(params, core_index).await {
1846				if let Error::BackgroundValidationMpsc(error) = error {
1847					gum::debug!(
1848						target: LOG_TARGET,
1849						?candidate_hash,
1850						?error,
1851						"Mpsc background validation mpsc died during validation- leaf no longer active?"
1852					);
1853				} else {
1854					gum::error!(
1855						target: LOG_TARGET,
1856						?candidate_hash,
1857						?error,
1858						"Failed to validate and make available",
1859					);
1860				}
1861			}
1862		};
1863
1864		ctx.spawn("backing-validation", bg.boxed())
1865			.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1866	}
1867
1868	Ok(())
1869}
1870
1871/// Kick off validation work and distribute the result as a signed statement.
1872#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1873async fn kick_off_validation_work<Context>(
1874	ctx: &mut Context,
1875	sp_state: &mut PerSchedulingParentState,
1876	persisted_validation_data: PersistedValidationData,
1877	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1878	attesting: AttestingData,
1879	executor_params: Arc<ExecutorParams>,
1880) -> Result<(), Error> {
1881	// Do nothing if the local validator is disabled or not a validator at all
1882	match sp_state.table_context.local_validator_is_disabled() {
1883		Some(true) => {
1884			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1885			return Ok(());
1886		},
1887		Some(false) => {}, // we are not disabled - move on
1888		None => {
1889			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1890			return Ok(());
1891		},
1892	}
1893
1894	let candidate_hash = attesting.candidate.hash();
1895	if sp_state.issued_statements.contains(&candidate_hash) {
1896		return Ok(());
1897	}
1898
1899	gum::debug!(
1900		target: LOG_TARGET,
1901		candidate_hash = ?candidate_hash,
1902		candidate_receipt = ?attesting.candidate,
1903		"Kicking off validation",
1904	);
1905
1906	let bg_sender = ctx.sender().clone();
1907	let pov = PoVData::FetchFromValidator {
1908		from_validator: attesting.from_validator,
1909		candidate_hash,
1910		pov_hash: attesting.pov_hash,
1911	};
1912	let scheduling_parent = attesting.candidate.descriptor().scheduling_parent();
1913
1914	background_validate_and_make_available(
1915		ctx,
1916		sp_state,
1917		BackgroundValidationParams {
1918			sender: bg_sender,
1919			tx_command: background_validation_tx.clone(),
1920			candidate: attesting.candidate,
1921			scheduling_parent,
1922			node_features: sp_state.node_features.clone(),
1923			executor_params,
1924			persisted_validation_data,
1925			pov,
1926			n_validators: sp_state.table_context.validators.len(),
1927			make_command: ValidatedCandidateCommand::Attest,
1928		},
1929	)
1930	.await
1931}
1932
1933/// Import the statement and kick off validation work if it is a part of our assignment.
1934#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1935async fn maybe_validate_and_import<Context>(
1936	ctx: &mut Context,
1937	state: &mut State,
1938	scheduling_parent: Hash,
1939	statement: SignedFullStatementWithPVD,
1940) -> Result<(), Error> {
1941	let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
1942		Some(r) => r,
1943		None => {
1944			gum::trace!(
1945				target: LOG_TARGET,
1946				?scheduling_parent,
1947				"Received statement for unknown scheduling parent"
1948			);
1949
1950			return Ok(());
1951		},
1952	};
1953
1954	// Don't import statement if the sender is disabled
1955	if sp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1956		gum::debug!(
1957			target: LOG_TARGET,
1958			sender_validator_idx = ?statement.validator_index(),
1959			"Not importing statement because the sender is disabled"
1960		);
1961		return Ok(());
1962	}
1963
1964	// Version consistency + V3 gating for Seconded statements (shared logic).
1965	if let StatementWithPVD::Seconded(receipt, _) = statement.payload() {
1966		if let Err(reason) = receipt.descriptor.check_version_acceptance(state.v3_ever_seen) {
1967			gum::debug!(
1968				target: LOG_TARGET,
1969				?scheduling_parent,
1970				"Not importing Seconded statement: {}",
1971				reason,
1972			);
1973			return Ok(());
1974		}
1975	}
1976
1977	let res = import_statement(ctx, sp_state, &mut state.per_candidate, &statement).await;
1978
1979	// if we get an Error::RejectedByProspectiveParachains,
1980	// we will do nothing.
1981	if let Err(Error::RejectedByProspectiveParachains) = res {
1982		gum::debug!(
1983			target: LOG_TARGET,
1984			?scheduling_parent,
1985			"Statement rejected by prospective parachains."
1986		);
1987
1988		return Ok(());
1989	}
1990
1991	let summary = res?;
1992	post_import_statement_actions(ctx, sp_state, summary.as_ref()).await;
1993
1994	if let Some(summary) = summary {
1995		// import_statement already takes care of communicating with the
1996		// prospective parachains subsystem. At this point, the candidate
1997		// has already been accepted by the subsystem.
1998
1999		let candidate_hash = summary.candidate;
2000
2001		if Some(summary.group_id) != sp_state.assigned_core {
2002			return Ok(());
2003		}
2004
2005		let attesting = match statement.payload() {
2006			StatementWithPVD::Seconded(receipt, _) => {
2007				let attesting = AttestingData {
2008					candidate: sp_state
2009						.table
2010						.get_candidate(&candidate_hash)
2011						.ok_or(Error::CandidateNotFound)?
2012						.to_plain(),
2013					pov_hash: receipt.descriptor.pov_hash(),
2014					from_validator: statement.validator_index(),
2015					backing: Vec::new(),
2016				};
2017				sp_state.fallbacks.insert(summary.candidate, attesting.clone());
2018				attesting
2019			},
2020			StatementWithPVD::Valid(candidate_hash) => {
2021				if let Some(attesting) = sp_state.fallbacks.get_mut(candidate_hash) {
2022					let our_index = sp_state.table_context.validator.as_ref().map(|v| v.index());
2023					if our_index == Some(statement.validator_index()) {
2024						return Ok(());
2025					}
2026
2027					if sp_state.awaiting_validation.contains(candidate_hash) {
2028						// Job already running:
2029						attesting.backing.push(statement.validator_index());
2030						return Ok(());
2031					} else {
2032						// No job, so start another with current validator:
2033						attesting.from_validator = statement.validator_index();
2034						attesting.clone()
2035					}
2036				} else {
2037					return Ok(());
2038				}
2039			},
2040		};
2041
2042		// Skip validation if local validator is disabled or not a validator.
2043		// Check this before fetching executor_params to avoid unnecessary runtime calls.
2044		match sp_state.table_context.local_validator_is_disabled() {
2045			Some(true) => return Ok(()),
2046			None => return Ok(()),
2047			Some(false) => {},
2048		}
2049
2050		// After `import_statement` succeeds, the candidate entry is guaranteed
2051		// to exist.
2052		if let Some(pvd) = state
2053			.per_candidate
2054			.get(&candidate_hash)
2055			.map(|pc| pc.persisted_validation_data.clone())
2056		{
2057			let executor_params = get_executor_params(
2058				&mut state.per_session_cache,
2059				attesting.candidate.descriptor(),
2060				sp_state,
2061				ctx.sender(),
2062			)
2063			.await?;
2064
2065			kick_off_validation_work(
2066				ctx,
2067				sp_state,
2068				pvd,
2069				&state.background_validation_tx,
2070				attesting,
2071				executor_params,
2072			)
2073			.await?;
2074		}
2075	}
2076	Ok(())
2077}
2078
2079/// Kick off background validation with intent to second.
2080#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2081async fn validate_and_second<Context>(
2082	ctx: &mut Context,
2083	sp_state: &mut PerSchedulingParentState,
2084	persisted_validation_data: PersistedValidationData,
2085	candidate: &CandidateReceipt,
2086	pov: Arc<PoV>,
2087	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
2088	executor_params: Arc<ExecutorParams>,
2089) -> Result<(), Error> {
2090	let candidate_hash = candidate.hash();
2091
2092	gum::debug!(
2093		target: LOG_TARGET,
2094		candidate_hash = ?candidate_hash,
2095		candidate_receipt = ?candidate,
2096		"Validate and second candidate",
2097	);
2098
2099	let bg_sender = ctx.sender().clone();
2100	let scheduling_parent = candidate.descriptor.scheduling_parent();
2101	background_validate_and_make_available(
2102		ctx,
2103		sp_state,
2104		BackgroundValidationParams {
2105			sender: bg_sender,
2106			tx_command: background_validation_tx.clone(),
2107			candidate: candidate.clone(),
2108			scheduling_parent,
2109			node_features: sp_state.node_features.clone(),
2110			executor_params,
2111			persisted_validation_data,
2112			pov: PoVData::Ready(pov),
2113			n_validators: sp_state.table_context.validators.len(),
2114			make_command: ValidatedCandidateCommand::Second,
2115		},
2116	)
2117	.await?;
2118
2119	Ok(())
2120}
2121
2122#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2123async fn handle_second_message<Context>(
2124	ctx: &mut Context,
2125	state: &mut State,
2126	candidate: CandidateReceipt,
2127	persisted_validation_data: PersistedValidationData,
2128	pov: PoV,
2129	metrics: &Metrics,
2130) -> Result<(), Error> {
2131	let _timer = metrics.time_process_second();
2132
2133	let candidate_hash = candidate.hash();
2134
2135	if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2136		gum::warn!(
2137			target: LOG_TARGET,
2138			?candidate_hash,
2139			"Candidate backing was asked to second candidate with wrong PVD",
2140		);
2141
2142		return Ok(());
2143	}
2144
2145	// Version consistency + V3 gating (shared logic from primitives).
2146	if let Err(reason) = candidate.descriptor().check_version_acceptance(state.v3_ever_seen) {
2147		gum::debug!(
2148			target: LOG_TARGET,
2149			?candidate_hash,
2150			"Not seconding candidate: {}",
2151			reason,
2152		);
2153		ctx.send_message(CollatorProtocolMessage::Invalid(
2154			candidate.descriptor().scheduling_parent(),
2155			candidate,
2156		))
2157		.await;
2158		return Ok(());
2159	}
2160
2161	// The signing context should use scheduling_parent (for V1/V2, this equals relay_parent)
2162	let scheduling_parent = candidate.descriptor().scheduling_parent();
2163
2164	// Look up the PerSchedulingParentState using scheduling_parent - this is where we'll sign
2165	let sp_state = match state.per_scheduling_parent.get_mut(&scheduling_parent) {
2166		None => {
2167			gum::trace!(
2168				target: LOG_TARGET,
2169				?scheduling_parent,
2170				?candidate_hash,
2171				"Candidate has scheduling_parent outside of our view."
2172			);
2173
2174			return Ok(());
2175		},
2176		Some(r) => r,
2177	};
2178
2179	// Get the scheduling info (assigned_core and claim_queue) from the scheduling_parent state
2180	let assigned_core = sp_state.assigned_core;
2181	let claim_queue = &sp_state.claim_queue;
2182
2183	// Just return if the local validator is disabled. If we are here the local node should be a
2184	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
2185	if sp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2186		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2187		return Ok(());
2188	}
2189
2190	// For V3, use scheduling info from scheduling_parent (claim queue determines assignments)
2191	let assigned_paras = assigned_core.and_then(|core| claim_queue.0.get(&core));
2192
2193	// Sanity check that candidate is from our assignment.
2194	if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2195		gum::debug!(
2196			target: LOG_TARGET,
2197			our_assignment_core = ?assigned_core,
2198			our_assignment_paras = ?assigned_paras,
2199			collation = ?candidate.descriptor().para_id(),
2200			"Subsystem asked to second for para outside of our assignment",
2201		);
2202		return Ok(());
2203	}
2204
2205	gum::debug!(
2206		target: LOG_TARGET,
2207		our_assignment_core = ?assigned_core,
2208		our_assignment_paras = ?assigned_paras,
2209		collation = ?candidate.descriptor().para_id(),
2210		"Current assignments vs collation",
2211	);
2212
2213	// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
2214	// Seconded statement only if we have not signed a Valid statement for the requested candidate.
2215	//
2216	// The actual logic of issuing the signed statement checks that this isn't
2217	// conflicting with other seconded candidates. Not doing that check here
2218	// gives other subsystems the ability to get us to execute arbitrary candidates,
2219	// but no more.
2220	if !sp_state.issued_statements.contains(&candidate_hash) {
2221		let pov = Arc::new(pov);
2222
2223		let executor_params = get_executor_params(
2224			&mut state.per_session_cache,
2225			candidate.descriptor(),
2226			sp_state,
2227			ctx.sender(),
2228		)
2229		.await?;
2230
2231		validate_and_second(
2232			ctx,
2233			sp_state,
2234			persisted_validation_data,
2235			&candidate,
2236			pov,
2237			&state.background_validation_tx,
2238			executor_params,
2239		)
2240		.await?;
2241	}
2242
2243	Ok(())
2244}
2245
2246#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2247async fn handle_statement_message<Context>(
2248	ctx: &mut Context,
2249	state: &mut State,
2250	scheduling_parent: Hash,
2251	statement: SignedFullStatementWithPVD,
2252	metrics: &Metrics,
2253) -> Result<(), Error> {
2254	let _timer = metrics.time_process_statement();
2255
2256	// Validator disabling is handled in `maybe_validate_and_import`
2257	match maybe_validate_and_import(ctx, state, scheduling_parent, statement).await {
2258		Err(Error::ValidationFailed(_)) => Ok(()),
2259		Err(e) => Err(e),
2260		Ok(()) => Ok(()),
2261	}
2262}
2263
2264fn handle_get_backable_candidates_message(
2265	state: &State,
2266	requested_candidates: HashMap<ParaId, Vec<BackableCandidateRef>>,
2267	tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2268	metrics: &Metrics,
2269) -> Result<(), Error> {
2270	let _timer = metrics.time_get_backed_candidates();
2271
2272	let mut backed = HashMap::with_capacity(requested_candidates.len());
2273
2274	for (para_id, para_candidates) in requested_candidates {
2275		for candidate_ref in para_candidates.iter() {
2276			let candidate_hash = candidate_ref.candidate_hash;
2277			let scheduling_parent = candidate_ref.scheduling_parent;
2278
2279			let sp_state = match state.per_scheduling_parent.get(&scheduling_parent) {
2280				Some(sp_state) => sp_state,
2281				None => {
2282					gum::debug!(
2283						target: LOG_TARGET,
2284						?scheduling_parent,
2285						?candidate_hash,
2286						"Requested candidate's scheduling parent is out of view",
2287					);
2288					break;
2289				},
2290			};
2291			let maybe_backed_candidate = sp_state
2292				.table
2293				.attested_candidate(
2294					&candidate_hash,
2295					&sp_state.table_context,
2296					sp_state.minimum_backing_votes,
2297				)
2298				.and_then(|attested| table_attested_to_backed(attested, &sp_state.table_context));
2299
2300			if let Some(backed_candidate) = maybe_backed_candidate {
2301				backed
2302					.entry(para_id)
2303					.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2304					.push(backed_candidate);
2305			} else {
2306				break;
2307			}
2308		}
2309	}
2310
2311	tx.send(backed).map_err(|data| Error::Send(data))?;
2312	Ok(())
2313}