Skip to main content

pezkuwi_node_core_backing/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
2// This file is part of Pezkuwi.
3
4// Pezkuwi is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Pezkuwi is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Pezkuwi.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Implements the `CandidateBackingSubsystem`.
18//!
19//! This subsystem maintains the entire responsibility of tracking teyrchain
20//! candidates which can be backed, as well as the issuance of statements
21//! about candidates when run on a validator node.
22//!
23//! There are two types of statements: `Seconded` and `Valid`.
24//! `Seconded` implies `Valid`, and nothing should be stated as
25//! `Valid` unless its already been `Seconded`.
26//!
27//! Validators may only second candidates which fall under their own group
28//! assignment, and they may only second one candidate per depth per active leaf.
29//! Candidates which are stated as either `Second` or `Valid` by a majority of the
30//! assigned group of validators may be backed on-chain and proceed to the availability
31//! stage.
32//!
33//! Depth is a concept relating to asynchronous backing, by which
34//! short sub-chains of candidates are backed and extended off-chain, and then placed
35//! asynchronously into blocks of the relay chain as those are authored and as the
36//! relay-chain state becomes ready for them. Asynchronous backing allows teyrchains to
37//! grow mostly independently from the state of the relay chain, which gives more time for
38//! teyrchains to be validated and thereby increases performance.
39//!
40//! Most of the work of asynchronous backing is handled by the Prospective Teyrchains
41//! subsystem. The 'depth' of a teyrchain block with respect to a relay chain block is
42//! a measure of how many teyrchain blocks are between the most recent included teyrchain block
43//! in the post-state of the relay-chain block and the candidate. For instance,
44//! a candidate that descends directly from the most recent teyrchain block in the relay-chain
45//! state has depth 0. The child of that candidate would have depth 1. And so on.
46//!
47//! The candidate backing subsystem keeps track of a set of 'active leaves' which are the
48//! most recent blocks in the relay-chain (which is in fact a tree) which could be built
49//! upon. Depth is always measured against active leaves, and the valid relay-parent that
50//! each candidate can have is determined by the active leaves. The Prospective Teyrchains
51//! subsystem enforces that the relay-parent increases monotonically, so that logic
52//! is not handled here. By communicating with the Prospective Teyrchains subsystem,
53//! this subsystem extrapolates an "implicit view" from the set of currently active leaves,
54//! which determines the set of all recent relay-chain block hashes which could be relay-parents
55//! for candidates backed in children of the active leaves.
56//!
57//! In fact, this subsystem relies on the Statement Distribution subsystem to prevent spam
58//! by enforcing the rule that each validator may second at most one candidate per depth per
59//! active leaf. This bounds the number of candidates that the system needs to consider and
60//! is not handled within this subsystem, except for candidates seconded locally.
61//!
62//! This subsystem also handles relay-chain heads which don't support asynchronous backing.
63//! For such active leaves, the only valid relay-parent is the leaf hash itself and the only
64//! allowed depth is 0.
65
66#![deny(unused_crate_dependencies)]
67
68use std::{
69	collections::{HashMap, HashSet},
70	sync::Arc,
71};
72
73use bitvec::vec::BitVec;
74use futures::{
75	channel::{mpsc, oneshot},
76	future::BoxFuture,
77	stream::FuturesOrdered,
78	FutureExt, SinkExt, StreamExt, TryFutureExt,
79};
80use schnellru::{ByLength, LruMap};
81
82use error::{Error, FatalResult};
83use pezkuwi_node_subsystem::{
84	messages::{
85		AvailabilityDistributionMessage, AvailabilityStoreMessage, CanSecondRequest,
86		CandidateBackingMessage, CandidateValidationMessage, CollatorProtocolMessage,
87		HypotheticalCandidate, HypotheticalMembershipRequest, IntroduceSecondedCandidateRequest,
88		ProspectiveTeyrchainsMessage, ProvisionableData, ProvisionerMessage, PvfExecKind,
89		RuntimeApiMessage, RuntimeApiRequest, StatementDistributionMessage,
90		StoreAvailableDataError,
91	},
92	overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, RuntimeApiError, SpawnedSubsystem,
93	SubsystemError,
94};
95use pezkuwi_node_subsystem_util::{
96	self as util,
97	backing_implicit_view::View as ImplicitView,
98	request_claim_queue, request_disabled_validators, request_min_backing_votes,
99	request_node_features, request_session_executor_params, request_session_index_for_child,
100	request_validator_groups, request_validators,
101	runtime::{self, ClaimQueueSnapshot},
102	Validator,
103};
104use pezkuwi_pez_node_primitives::{
105	AvailableData, InvalidCandidate, PoV, SignedFullStatementWithPVD, StatementWithPVD,
106	ValidationResult,
107};
108use pezkuwi_primitives::{
109	BackedCandidate, CandidateCommitments, CandidateHash, CandidateReceiptV2 as CandidateReceipt,
110	CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex, ExecutorParams,
111	GroupIndex, GroupRotationInfo, Hash, Id as ParaId, IndexedVec, NodeFeatures,
112	PersistedValidationData, SessionIndex, SigningContext, ValidationCode, ValidatorId,
113	ValidatorIndex, ValidatorSignature, ValidityAttestation,
114};
115use pezkuwi_statement_table::{
116	generic::AttestedCandidate as TableAttestedCandidate,
117	v2::{
118		SignedStatement as TableSignedStatement, Statement as TableStatement,
119		Summary as TableSummary,
120	},
121	Context as TableContextTrait, Table,
122};
123use pezkuwi_teyrchain_primitives::primitives::IsSystem;
124use pezsp_keystore::KeystorePtr;
125
126mod error;
127
128mod metrics;
129use self::metrics::Metrics;
130
131#[cfg(test)]
132mod tests;
133
134const LOG_TARGET: &str = "teyrchain::candidate-backing";
135
136/// PoV data to validate.
137enum PoVData {
138	/// Already available (from candidate selection).
139	Ready(Arc<PoV>),
140	/// Needs to be fetched from validator (we are checking a signed statement).
141	FetchFromValidator {
142		from_validator: ValidatorIndex,
143		candidate_hash: CandidateHash,
144		pov_hash: Hash,
145	},
146}
147
148enum ValidatedCandidateCommand {
149	// We were instructed to second the candidate that has been already validated.
150	Second(BackgroundValidationResult),
151	// We were instructed to validate the candidate.
152	Attest(BackgroundValidationResult),
153	// We were not able to `Attest` because backing validator did not send us the PoV.
154	AttestNoPoV(CandidateHash),
155}
156
157impl std::fmt::Debug for ValidatedCandidateCommand {
158	fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
159		let candidate_hash = self.candidate_hash();
160		match *self {
161			ValidatedCandidateCommand::Second(_) => write!(f, "Second({})", candidate_hash),
162			ValidatedCandidateCommand::Attest(_) => write!(f, "Attest({})", candidate_hash),
163			ValidatedCandidateCommand::AttestNoPoV(_) => write!(f, "Attest({})", candidate_hash),
164		}
165	}
166}
167
168impl ValidatedCandidateCommand {
169	fn candidate_hash(&self) -> CandidateHash {
170		match *self {
171			ValidatedCandidateCommand::Second(Ok(ref outputs)) => outputs.candidate.hash(),
172			ValidatedCandidateCommand::Second(Err(ref candidate)) => candidate.hash(),
173			ValidatedCandidateCommand::Attest(Ok(ref outputs)) => outputs.candidate.hash(),
174			ValidatedCandidateCommand::Attest(Err(ref candidate)) => candidate.hash(),
175			ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => candidate_hash,
176		}
177	}
178}
179
180/// The candidate backing subsystem.
181pub struct CandidateBackingSubsystem {
182	keystore: KeystorePtr,
183	metrics: Metrics,
184}
185
186impl CandidateBackingSubsystem {
187	/// Create a new instance of the `CandidateBackingSubsystem`.
188	pub fn new(keystore: KeystorePtr, metrics: Metrics) -> Self {
189		Self { keystore, metrics }
190	}
191}
192
193#[overseer::subsystem(CandidateBacking, error = SubsystemError, prefix = self::overseer)]
194impl<Context> CandidateBackingSubsystem
195where
196	Context: Send + Sync,
197{
198	fn start(self, ctx: Context) -> SpawnedSubsystem {
199		let future = async move {
200			run(ctx, self.keystore, self.metrics)
201				.await
202				.map_err(|e| SubsystemError::with_origin("candidate-backing", e))
203		}
204		.boxed();
205
206		SpawnedSubsystem { name: "candidate-backing-subsystem", future }
207	}
208}
209
210struct PerRelayParentState {
211	/// The hash of the relay parent on top of which this job is doing it's work.
212	parent: Hash,
213	/// The node features.
214	node_features: NodeFeatures,
215	/// The executor parameters.
216	executor_params: Arc<ExecutorParams>,
217	/// The `CoreIndex` assigned to the local validator at this relay parent.
218	assigned_core: Option<CoreIndex>,
219	/// The candidates that are backed by enough validators in their group, by hash.
220	backed: HashSet<CandidateHash>,
221	/// The table of candidates and statements under this relay-parent.
222	table: Table<TableContext>,
223	/// The table context, including groups.
224	table_context: TableContext,
225	/// We issued `Seconded` or `Valid` statements on about these candidates.
226	issued_statements: HashSet<CandidateHash>,
227	/// These candidates are undergoing validation in the background.
228	awaiting_validation: HashSet<CandidateHash>,
229	/// Data needed for retrying in case of `ValidatedCandidateCommand::AttestNoPoV`.
230	fallbacks: HashMap<CandidateHash, AttestingData>,
231	/// The minimum backing votes threshold.
232	minimum_backing_votes: u32,
233	/// The number of cores.
234	n_cores: u32,
235	/// Claim queue state. If the runtime API is not available, it'll be populated with info from
236	/// availability cores.
237	claim_queue: ClaimQueueSnapshot,
238	/// The validator index -> group mapping at this relay parent.
239	validator_to_group: Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>,
240	/// The associated group rotation information.
241	group_rotation_info: GroupRotationInfo,
242}
243
244struct PerCandidateState {
245	persisted_validation_data: PersistedValidationData,
246	seconded_locally: bool,
247	relay_parent: Hash,
248}
249
250/// A cache for storing data per-session to reduce repeated
251/// runtime API calls and avoid redundant computations.
252struct PerSessionCache {
253	/// Cache for storing validators list, retrieved from the runtime.
254	validators_cache: LruMap<SessionIndex, Arc<Vec<ValidatorId>>>,
255	/// Cache for storing node features, retrieved from the runtime.
256	node_features_cache: LruMap<SessionIndex, NodeFeatures>,
257	/// Cache for storing executor parameters, retrieved from the runtime.
258	executor_params_cache: LruMap<SessionIndex, Arc<ExecutorParams>>,
259	/// Cache for storing the minimum backing votes threshold, retrieved from the runtime.
260	minimum_backing_votes_cache: LruMap<SessionIndex, u32>,
261	/// Cache for storing validator-to-group mappings, computed from validator groups.
262	validator_to_group_cache:
263		LruMap<SessionIndex, Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>>>,
264}
265
266impl Default for PerSessionCache {
267	/// Creates a new `PerSessionCache` with a default capacity.
268	fn default() -> Self {
269		Self::new(2)
270	}
271}
272
273impl PerSessionCache {
274	/// Creates a new `PerSessionCache` with a given capacity.
275	fn new(capacity: u32) -> Self {
276		PerSessionCache {
277			validators_cache: LruMap::new(ByLength::new(capacity)),
278			node_features_cache: LruMap::new(ByLength::new(capacity)),
279			executor_params_cache: LruMap::new(ByLength::new(capacity)),
280			minimum_backing_votes_cache: LruMap::new(ByLength::new(capacity)),
281			validator_to_group_cache: LruMap::new(ByLength::new(capacity)),
282		}
283	}
284
285	/// Gets validators from the cache or fetches them from the runtime if not present.
286	async fn validators(
287		&mut self,
288		session_index: SessionIndex,
289		parent: Hash,
290		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
291	) -> Result<Arc<Vec<ValidatorId>>, RuntimeApiError> {
292		// Try to get the validators list from the cache.
293		if let Some(validators) = self.validators_cache.get(&session_index) {
294			return Ok(Arc::clone(validators));
295		}
296
297		// Fetch the validators list from the runtime since it was not in the cache.
298		let validators: Vec<ValidatorId> =
299			request_validators(parent, sender).await.await.map_err(|err| {
300				RuntimeApiError::Execution { runtime_api_name: "Validators", source: Arc::new(err) }
301			})??;
302
303		// Wrap the validators list in an Arc to avoid a deep copy when storing it in the cache.
304		let validators = Arc::new(validators);
305
306		// Cache the fetched validators list for future use.
307		self.validators_cache.insert(session_index, Arc::clone(&validators));
308
309		Ok(validators)
310	}
311
312	/// Gets the node features from the cache or fetches it from the runtime if not present.
313	async fn node_features(
314		&mut self,
315		session_index: SessionIndex,
316		parent: Hash,
317		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
318	) -> Result<NodeFeatures, RuntimeApiError> {
319		// Try to get the node features from the cache.
320		if let Some(node_features) = self.node_features_cache.get(&session_index) {
321			return Ok(node_features.clone());
322		}
323
324		// Fetch the node features from the runtime since it was not in the cache.
325		let node_features = request_node_features(parent, session_index, sender)
326			.await
327			.await
328			.map_err(|err| RuntimeApiError::Execution {
329				runtime_api_name: "NodeFeatures",
330				source: Arc::new(err),
331			})??;
332
333		// Cache the fetched node features for future use.
334		self.node_features_cache.insert(session_index, node_features.clone());
335
336		Ok(node_features)
337	}
338
339	/// Gets the executor parameters from the cache or
340	/// fetches them from the runtime if not present.
341	async fn executor_params(
342		&mut self,
343		session_index: SessionIndex,
344		parent: Hash,
345		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
346	) -> Result<Arc<ExecutorParams>, RuntimeApiError> {
347		// Try to get the executor parameters from the cache.
348		if let Some(executor_params) = self.executor_params_cache.get(&session_index) {
349			return Ok(Arc::clone(executor_params));
350		}
351
352		// Fetch the executor parameters from the runtime since it was not in the cache.
353		let executor_params = request_session_executor_params(parent, session_index, sender)
354			.await
355			.await
356			.map_err(|err| RuntimeApiError::Execution {
357				runtime_api_name: "SessionExecutorParams",
358				source: Arc::new(err),
359			})??
360			.ok_or_else(|| RuntimeApiError::Execution {
361				runtime_api_name: "SessionExecutorParams",
362				source: Arc::new(Error::MissingExecutorParams),
363			})?;
364
365		// Wrap the executor parameters in an Arc to avoid a deep copy when storing it in the cache.
366		let executor_params = Arc::new(executor_params);
367
368		// Cache the fetched executor parameters for future use.
369		self.executor_params_cache.insert(session_index, Arc::clone(&executor_params));
370
371		Ok(executor_params)
372	}
373
374	/// Gets the minimum backing votes threshold from the
375	/// cache or fetches it from the runtime if not present.
376	async fn minimum_backing_votes(
377		&mut self,
378		session_index: SessionIndex,
379		parent: Hash,
380		sender: &mut impl overseer::SubsystemSender<RuntimeApiMessage>,
381	) -> Result<u32, RuntimeApiError> {
382		// Try to get the value from the cache.
383		if let Some(minimum_backing_votes) = self.minimum_backing_votes_cache.get(&session_index) {
384			return Ok(*minimum_backing_votes);
385		}
386
387		// Fetch the value from the runtime since it was not in the cache.
388		let minimum_backing_votes = request_min_backing_votes(parent, session_index, sender)
389			.await
390			.await
391			.map_err(|err| RuntimeApiError::Execution {
392				runtime_api_name: "MinimumBackingVotes",
393				source: Arc::new(err),
394			})??;
395
396		// Cache the fetched value for future use.
397		self.minimum_backing_votes_cache.insert(session_index, minimum_backing_votes);
398
399		Ok(minimum_backing_votes)
400	}
401
402	/// Gets or computes the validator-to-group mapping for a session.
403	fn validator_to_group(
404		&mut self,
405		session_index: SessionIndex,
406		validators: &[ValidatorId],
407		validator_groups: &[Vec<ValidatorIndex>],
408	) -> Arc<IndexedVec<ValidatorIndex, Option<GroupIndex>>> {
409		let validator_to_group = self
410			.validator_to_group_cache
411			.get_or_insert(session_index, || {
412				let mut vector = vec![None; validators.len()];
413
414				for (group_idx, validator_group) in validator_groups.iter().enumerate() {
415					for validator in validator_group {
416						vector[validator.0 as usize] = Some(GroupIndex(group_idx as u32));
417					}
418				}
419
420				Arc::new(IndexedVec::<_, _>::from(vector))
421			})
422			.expect("Just inserted");
423
424		Arc::clone(validator_to_group)
425	}
426}
427
428/// The state of the subsystem.
429struct State {
430	/// The utility for managing the implicit and explicit views in a consistent way.
431	implicit_view: ImplicitView,
432	/// State tracked for all relay-parents backing work is ongoing for. This includes
433	/// all active leaves.
434	per_relay_parent: HashMap<Hash, PerRelayParentState>,
435	/// State tracked for all candidates relevant to the implicit view.
436	///
437	/// This is guaranteed to have an entry for each candidate with a relay parent in the implicit
438	/// or explicit view for which a `Seconded` statement has been successfully imported.
439	per_candidate: HashMap<CandidateHash, PerCandidateState>,
440	/// A local cache for storing per-session data. This cache helps to
441	/// reduce repeated calls to the runtime and avoid redundant computations.
442	per_session_cache: PerSessionCache,
443	/// A clonable sender which is dispatched to background candidate validation tasks to inform
444	/// the main task of the result.
445	background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
446	/// The handle to the keystore used for signing.
447	keystore: KeystorePtr,
448}
449
450impl State {
451	fn new(
452		background_validation_tx: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
453		keystore: KeystorePtr,
454	) -> Self {
455		State {
456			implicit_view: ImplicitView::default(),
457			per_relay_parent: HashMap::default(),
458			per_candidate: HashMap::new(),
459			per_session_cache: PerSessionCache::default(),
460			background_validation_tx,
461			keystore,
462		}
463	}
464}
465
466#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
467async fn run<Context>(
468	mut ctx: Context,
469	keystore: KeystorePtr,
470	metrics: Metrics,
471) -> FatalResult<()> {
472	let (background_validation_tx, mut background_validation_rx) = mpsc::channel(16);
473	let mut state = State::new(background_validation_tx, keystore);
474
475	loop {
476		let res =
477			run_iteration(&mut ctx, &mut state, &metrics, &mut background_validation_rx).await;
478
479		match res {
480			Ok(()) => break,
481			Err(e) => crate::error::log_error(Err(e))?,
482		}
483	}
484
485	Ok(())
486}
487
488#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
489async fn run_iteration<Context>(
490	ctx: &mut Context,
491	state: &mut State,
492	metrics: &Metrics,
493	background_validation_rx: &mut mpsc::Receiver<(Hash, ValidatedCandidateCommand)>,
494) -> Result<(), Error> {
495	loop {
496		futures::select!(
497			validated_command = background_validation_rx.next().fuse() => {
498				if let Some((relay_parent, command)) = validated_command {
499					handle_validated_candidate_command(
500						&mut *ctx,
501						state,
502						relay_parent,
503						command,
504						metrics,
505					).await?;
506				} else {
507					panic!("background_validation_tx always alive at this point; qed");
508				}
509			}
510			from_overseer = ctx.recv().fuse() => {
511				match from_overseer.map_err(Error::OverseerExited)? {
512					FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => {
513						handle_active_leaves_update(
514							&mut *ctx,
515							update,
516							state,
517						).await?;
518					}
519					FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}
520					FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()),
521					FromOrchestra::Communication { msg } => {
522						handle_communication(&mut *ctx, state, msg, metrics).await?;
523					}
524				}
525			}
526		)
527	}
528}
529
530/// In case a backing validator does not provide a PoV, we need to retry with other backing
531/// validators.
532///
533/// This is the data needed to accomplish this. Basically all the data needed for spawning a
534/// validation job and a list of backing validators, we can try.
535#[derive(Clone)]
536struct AttestingData {
537	/// The candidate to attest.
538	candidate: CandidateReceipt,
539	/// Hash of the PoV we need to fetch.
540	pov_hash: Hash,
541	/// Validator we are currently trying to get the PoV from.
542	from_validator: ValidatorIndex,
543	/// Other backing validators we can try in case `from_validator` failed.
544	backing: Vec<ValidatorIndex>,
545}
546
547#[derive(Default, Debug)]
548struct TableContext {
549	validator: Option<Validator>,
550	groups: HashMap<CoreIndex, Vec<ValidatorIndex>>,
551	validators: Vec<ValidatorId>,
552	disabled_validators: Vec<ValidatorIndex>,
553}
554
555impl TableContext {
556	// Returns `true` if the provided `ValidatorIndex` is in the disabled validators list
557	pub fn validator_is_disabled(&self, validator_idx: &ValidatorIndex) -> bool {
558		self.disabled_validators
559			.iter()
560			.any(|disabled_val_idx| *disabled_val_idx == *validator_idx)
561	}
562
563	// Returns `true` if the local validator is in the disabled validators list
564	pub fn local_validator_is_disabled(&self) -> Option<bool> {
565		self.validator.as_ref().map(|v| v.disabled())
566	}
567}
568
569impl TableContextTrait for TableContext {
570	type AuthorityId = ValidatorIndex;
571	type Digest = CandidateHash;
572	type GroupId = CoreIndex;
573	type Signature = ValidatorSignature;
574	type Candidate = CommittedCandidateReceipt;
575
576	fn candidate_digest(candidate: &CommittedCandidateReceipt) -> CandidateHash {
577		candidate.hash()
578	}
579
580	fn is_member_of(&self, authority: &ValidatorIndex, core: &CoreIndex) -> bool {
581		self.groups.get(core).map_or(false, |g| g.iter().any(|a| a == authority))
582	}
583
584	fn get_group_size(&self, group: &CoreIndex) -> Option<usize> {
585		self.groups.get(group).map(|g| g.len())
586	}
587}
588
589// It looks like it's not possible to do an `impl From` given the current state of
590// the code. So this does the necessary conversion.
591fn primitive_statement_to_table(s: &SignedFullStatementWithPVD) -> TableSignedStatement {
592	let statement = match s.payload() {
593		StatementWithPVD::Seconded(c, _) => TableStatement::Seconded(c.clone()),
594		StatementWithPVD::Valid(h) => TableStatement::Valid(*h),
595	};
596
597	TableSignedStatement {
598		statement,
599		signature: s.signature().clone(),
600		sender: s.validator_index(),
601	}
602}
603
604fn table_attested_to_backed(
605	attested: TableAttestedCandidate<
606		CoreIndex,
607		CommittedCandidateReceipt,
608		ValidatorIndex,
609		ValidatorSignature,
610	>,
611	table_context: &TableContext,
612) -> Option<BackedCandidate> {
613	let TableAttestedCandidate { candidate, validity_votes, group_id: core_index } = attested;
614
615	let (ids, validity_votes): (Vec<_>, Vec<ValidityAttestation>) =
616		validity_votes.into_iter().map(|(id, vote)| (id, vote.into())).unzip();
617
618	let group = table_context.groups.get(&core_index)?;
619
620	let mut validator_indices = BitVec::with_capacity(group.len());
621
622	validator_indices.resize(group.len(), false);
623
624	// The order of the validity votes in the backed candidate must match
625	// the order of bits set in the bitfield, which is not necessarily
626	// the order of the `validity_votes` we got from the table.
627	let mut vote_positions = Vec::with_capacity(validity_votes.len());
628	for (orig_idx, id) in ids.iter().enumerate() {
629		if let Some(position) = group.iter().position(|x| x == id) {
630			validator_indices.set(position, true);
631			vote_positions.push((orig_idx, position));
632		} else {
633			gum::warn!(
634				target: LOG_TARGET,
635				"Logic error: Validity vote from table does not correspond to group",
636			);
637
638			return None;
639		}
640	}
641	vote_positions.sort_by_key(|(_orig, pos_in_group)| *pos_in_group);
642
643	Some(BackedCandidate::new(
644		candidate,
645		vote_positions
646			.into_iter()
647			.map(|(pos_in_votes, _pos_in_group)| validity_votes[pos_in_votes].clone())
648			.collect(),
649		validator_indices,
650		core_index,
651	))
652}
653
654async fn store_available_data(
655	sender: &mut impl overseer::CandidateBackingSenderTrait,
656	n_validators: u32,
657	candidate_hash: CandidateHash,
658	available_data: AvailableData,
659	expected_erasure_root: Hash,
660	core_index: CoreIndex,
661	node_features: NodeFeatures,
662) -> Result<(), Error> {
663	let (tx, rx) = oneshot::channel();
664	// Important: the `av-store` subsystem will check if the erasure root of the `available_data`
665	// matches `expected_erasure_root` which was provided by the collator in the `CandidateReceipt`.
666	// This check is consensus critical and the `backing` subsystem relies on it for ensuring
667	// candidate validity.
668	sender
669		.send_message(AvailabilityStoreMessage::StoreAvailableData {
670			candidate_hash,
671			n_validators,
672			available_data,
673			expected_erasure_root,
674			core_index,
675			node_features,
676			tx,
677		})
678		.await;
679
680	rx.await
681		.map_err(Error::StoreAvailableDataChannel)?
682		.map_err(Error::StoreAvailableData)
683}
684
685// Make a `PoV` available.
686//
687// This calls the AV store to write the available data to storage. The AV store also checks the
688// erasure root matches the `expected_erasure_root`.
689// This returns `Err()` on erasure root mismatch or due to any AV store subsystem error.
690//
691// Otherwise, it returns `Ok(())`.
692async fn make_pov_available(
693	sender: &mut impl overseer::CandidateBackingSenderTrait,
694	n_validators: usize,
695	pov: Arc<PoV>,
696	candidate_hash: CandidateHash,
697	validation_data: PersistedValidationData,
698	expected_erasure_root: Hash,
699	core_index: CoreIndex,
700	node_features: NodeFeatures,
701) -> Result<(), Error> {
702	store_available_data(
703		sender,
704		n_validators as u32,
705		candidate_hash,
706		AvailableData { pov, validation_data },
707		expected_erasure_root,
708		core_index,
709		node_features,
710	)
711	.await
712}
713
714async fn request_pov(
715	sender: &mut impl overseer::CandidateBackingSenderTrait,
716	relay_parent: Hash,
717	from_validator: ValidatorIndex,
718	para_id: ParaId,
719	candidate_hash: CandidateHash,
720	pov_hash: Hash,
721) -> Result<Arc<PoV>, Error> {
722	let (tx, rx) = oneshot::channel();
723	sender
724		.send_message(AvailabilityDistributionMessage::FetchPoV {
725			relay_parent,
726			from_validator,
727			para_id,
728			candidate_hash,
729			pov_hash,
730			tx,
731		})
732		.await;
733
734	let pov = rx.await.map_err(|_| Error::FetchPoV)?;
735	Ok(Arc::new(pov))
736}
737
738async fn request_candidate_validation(
739	sender: &mut impl overseer::CandidateBackingSenderTrait,
740	validation_data: PersistedValidationData,
741	validation_code: ValidationCode,
742	candidate_receipt: CandidateReceipt,
743	pov: Arc<PoV>,
744	executor_params: ExecutorParams,
745) -> Result<ValidationResult, Error> {
746	let (tx, rx) = oneshot::channel();
747	let is_system = candidate_receipt.descriptor.para_id().is_system();
748	let relay_parent = candidate_receipt.descriptor.relay_parent();
749
750	sender
751		.send_message(CandidateValidationMessage::ValidateFromExhaustive {
752			validation_data,
753			validation_code,
754			candidate_receipt,
755			pov,
756			executor_params,
757			exec_kind: if is_system {
758				PvfExecKind::BackingSystemParas(relay_parent)
759			} else {
760				PvfExecKind::Backing(relay_parent)
761			},
762			response_sender: tx,
763		})
764		.await;
765
766	match rx.await {
767		Ok(Ok(validation_result)) => Ok(validation_result),
768		Ok(Err(err)) => Err(Error::ValidationFailed(err)),
769		Err(err) => Err(Error::ValidateFromExhaustive(err)),
770	}
771}
772
773struct BackgroundValidationOutputs {
774	candidate: CandidateReceipt,
775	commitments: CandidateCommitments,
776	persisted_validation_data: PersistedValidationData,
777}
778
779type BackgroundValidationResult = Result<BackgroundValidationOutputs, CandidateReceipt>;
780
781struct BackgroundValidationParams<S: overseer::CandidateBackingSenderTrait, F> {
782	sender: S,
783	tx_command: mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
784	candidate: CandidateReceipt,
785	relay_parent: Hash,
786	node_features: NodeFeatures,
787	executor_params: Arc<ExecutorParams>,
788	persisted_validation_data: PersistedValidationData,
789	pov: PoVData,
790	n_validators: usize,
791	make_command: F,
792}
793
794async fn validate_and_make_available(
795	params: BackgroundValidationParams<
796		impl overseer::CandidateBackingSenderTrait,
797		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Sync,
798	>,
799	core_index: CoreIndex,
800) -> Result<(), Error> {
801	let BackgroundValidationParams {
802		mut sender,
803		mut tx_command,
804		candidate,
805		relay_parent,
806		node_features,
807		executor_params,
808		persisted_validation_data,
809		pov,
810		n_validators,
811		make_command,
812	} = params;
813
814	let validation_code = {
815		let validation_code_hash = candidate.descriptor().validation_code_hash();
816		let (tx, rx) = oneshot::channel();
817		sender
818			.send_message(RuntimeApiMessage::Request(
819				relay_parent,
820				RuntimeApiRequest::ValidationCodeByHash(validation_code_hash, tx),
821			))
822			.await;
823
824		let code = rx.await.map_err(Error::RuntimeApiUnavailable)?;
825		match code {
826			Err(e) => return Err(Error::FetchValidationCode(validation_code_hash, e)),
827			Ok(None) => return Err(Error::NoValidationCode(validation_code_hash)),
828			Ok(Some(c)) => c,
829		}
830	};
831
832	let pov = match pov {
833		PoVData::Ready(pov) => pov,
834		PoVData::FetchFromValidator { from_validator, candidate_hash, pov_hash } => {
835			match request_pov(
836				&mut sender,
837				relay_parent,
838				from_validator,
839				candidate.descriptor.para_id(),
840				candidate_hash,
841				pov_hash,
842			)
843			.await
844			{
845				Err(Error::FetchPoV) => {
846					tx_command
847						.send((
848							relay_parent,
849							ValidatedCandidateCommand::AttestNoPoV(candidate.hash()),
850						))
851						.await
852						.map_err(Error::BackgroundValidationMpsc)?;
853					return Ok(());
854				},
855				Err(err) => return Err(err),
856				Ok(pov) => pov,
857			}
858		},
859	};
860
861	let v = {
862		request_candidate_validation(
863			&mut sender,
864			persisted_validation_data,
865			validation_code,
866			candidate.clone(),
867			pov.clone(),
868			executor_params.as_ref().clone(),
869		)
870		.await?
871	};
872
873	let res = match v {
874		ValidationResult::Valid(commitments, validation_data) => {
875			gum::debug!(
876				target: LOG_TARGET,
877				candidate_hash = ?candidate.hash(),
878				"Validation successful",
879			);
880
881			let erasure_valid = make_pov_available(
882				&mut sender,
883				n_validators,
884				pov.clone(),
885				candidate.hash(),
886				validation_data.clone(),
887				candidate.descriptor.erasure_root(),
888				core_index,
889				node_features,
890			)
891			.await;
892
893			match erasure_valid {
894				Ok(()) => Ok(BackgroundValidationOutputs {
895					candidate,
896					commitments,
897					persisted_validation_data: validation_data,
898				}),
899				Err(Error::StoreAvailableData(StoreAvailableDataError::InvalidErasureRoot)) => {
900					gum::debug!(
901						target: LOG_TARGET,
902						candidate_hash = ?candidate.hash(),
903						actual_commitments = ?commitments,
904						"Erasure root doesn't match the announced by the candidate receipt",
905					);
906					Err(candidate)
907				},
908				// Bubble up any other error.
909				Err(e) => return Err(e),
910			}
911		},
912		ValidationResult::Invalid(InvalidCandidate::CommitmentsHashMismatch) => {
913			// If validation produces a new set of commitments, we vote the candidate as invalid.
914			gum::warn!(
915				target: LOG_TARGET,
916				candidate_hash = ?candidate.hash(),
917				"Validation yielded different commitments",
918			);
919			Err(candidate)
920		},
921		ValidationResult::Invalid(reason) => {
922			gum::warn!(
923				target: LOG_TARGET,
924				candidate_hash = ?candidate.hash(),
925				reason = ?reason,
926				"Validation yielded an invalid candidate",
927			);
928			Err(candidate)
929		},
930	};
931
932	tx_command.send((relay_parent, make_command(res))).await.map_err(Into::into)
933}
934
935#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
936async fn handle_communication<Context>(
937	ctx: &mut Context,
938	state: &mut State,
939	message: CandidateBackingMessage,
940	metrics: &Metrics,
941) -> Result<(), Error> {
942	match message {
943		CandidateBackingMessage::Second(_relay_parent, candidate, pvd, pov) => {
944			handle_second_message(ctx, state, candidate, pvd, pov, metrics).await?;
945		},
946		CandidateBackingMessage::Statement(relay_parent, statement) => {
947			handle_statement_message(ctx, state, relay_parent, statement, metrics).await?;
948		},
949		CandidateBackingMessage::GetBackableCandidates(requested_candidates, tx) => {
950			handle_get_backable_candidates_message(state, requested_candidates, tx, metrics)?
951		},
952		CandidateBackingMessage::CanSecond(request, tx) => {
953			handle_can_second_request(ctx, state, request, tx).await
954		},
955	}
956
957	Ok(())
958}
959
960#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
961async fn handle_active_leaves_update<Context>(
962	ctx: &mut Context,
963	update: ActiveLeavesUpdate,
964	state: &mut State,
965) -> Result<(), Error> {
966	// Activate in implicit view before deactivate, per the docs
967	// on ImplicitView, this is more efficient.
968	let res = if let Some(leaf) = update.activated {
969		let leaf_hash = leaf.hash;
970		Some((leaf, state.implicit_view.activate_leaf(ctx.sender(), leaf_hash).await.map(|_| ())))
971	} else {
972		None
973	};
974
975	for deactivated in update.deactivated {
976		state.implicit_view.deactivate_leaf(deactivated);
977	}
978
979	// clean up `per_relay_parent` according to ancestry
980	// of leaves. we do this so we can clean up candidates right after
981	// as a result.
982	{
983		let remaining: HashSet<_> = state.implicit_view.all_allowed_relay_parents().collect();
984
985		state.per_relay_parent.retain(|r, _| remaining.contains(&r));
986	}
987
988	// clean up `per_candidate` according to which relay-parents
989	// are known.
990	state
991		.per_candidate
992		.retain(|_, pc| state.per_relay_parent.contains_key(&pc.relay_parent));
993
994	// Get relay parents which might be fresh but might be known already
995	// that are explicit or implicit from the new active leaf.
996	let fresh_relay_parents = match res {
997		None => return Ok(()),
998		Some((leaf, Ok(_))) => {
999			let fresh_relay_parents =
1000				state.implicit_view.known_allowed_relay_parents_under(&leaf.hash, None);
1001
1002			let fresh_relay_parent = match fresh_relay_parents {
1003				Some(f) => f.to_vec(),
1004				None => {
1005					gum::warn!(
1006						target: LOG_TARGET,
1007						leaf_hash = ?leaf.hash,
1008						"Implicit view gave no relay-parents"
1009					);
1010
1011					vec![leaf.hash]
1012				},
1013			};
1014			fresh_relay_parent
1015		},
1016		Some((leaf, Err(e))) => {
1017			gum::debug!(
1018				target: LOG_TARGET,
1019				leaf_hash = ?leaf.hash,
1020				err = ?e,
1021				"Failed to load implicit view for leaf."
1022			);
1023
1024			return Ok(());
1025		},
1026	};
1027
1028	// add entries in `per_relay_parent`. for all new relay-parents.
1029	for maybe_new in fresh_relay_parents {
1030		if state.per_relay_parent.contains_key(&maybe_new) {
1031			continue;
1032		}
1033
1034		// construct a `PerRelayParent` from the runtime API
1035		// and insert it.
1036		let per = construct_per_relay_parent_state(
1037			ctx,
1038			maybe_new,
1039			&state.keystore,
1040			&mut state.per_session_cache,
1041		)
1042		.await?;
1043
1044		if let Some(per) = per {
1045			state.per_relay_parent.insert(maybe_new, per);
1046		}
1047	}
1048
1049	Ok(())
1050}
1051
1052macro_rules! try_runtime_api {
1053	($x: expr) => {
1054		match $x {
1055			Ok(x) => x,
1056			Err(err) => {
1057				// Only bubble up fatal errors.
1058				error::log_error(Err(Into::<runtime::Error>::into(err).into()))?;
1059
1060				// We can't do candidate validation work if we don't have the
1061				// requisite runtime API data. But these errors should not take
1062				// down the node.
1063				return Ok(None);
1064			},
1065		}
1066	};
1067}
1068
1069fn core_index_from_statement(
1070	validator_to_group: &IndexedVec<ValidatorIndex, Option<GroupIndex>>,
1071	group_rotation_info: &GroupRotationInfo,
1072	n_cores: u32,
1073	claim_queue: &ClaimQueueSnapshot,
1074	statement: &SignedFullStatementWithPVD,
1075) -> Option<CoreIndex> {
1076	let compact_statement = statement.as_unchecked();
1077	let candidate_hash = CandidateHash(*compact_statement.unchecked_payload().candidate_hash());
1078
1079	gum::trace!(
1080		target:LOG_TARGET,
1081		?group_rotation_info,
1082		?statement,
1083		?validator_to_group,
1084		n_cores,
1085		?candidate_hash,
1086		"Extracting core index from statement"
1087	);
1088
1089	let statement_validator_index = statement.validator_index();
1090	let Some(Some(group_index)) = validator_to_group.get(statement_validator_index) else {
1091		gum::debug!(
1092			target: LOG_TARGET,
1093			?group_rotation_info,
1094			?statement,
1095			?validator_to_group,
1096			n_cores,
1097			?candidate_hash,
1098			"Invalid validator index: {:?}",
1099			statement_validator_index
1100		);
1101		return None;
1102	};
1103
1104	// First check if the statement para id matches the core assignment.
1105	let core_index = group_rotation_info.core_for_group(*group_index, n_cores as _);
1106
1107	if core_index.0 > n_cores {
1108		gum::warn!(target: LOG_TARGET, ?candidate_hash, ?core_index, n_cores, "Invalid CoreIndex");
1109		return None;
1110	}
1111
1112	if let StatementWithPVD::Seconded(candidate, _pvd) = statement.payload() {
1113		let candidate_para_id = candidate.descriptor.para_id();
1114		let mut assigned_paras = claim_queue.iter_claims_for_core(&core_index);
1115
1116		if !assigned_paras.any(|id| id == &candidate_para_id) {
1117			gum::debug!(
1118				target: LOG_TARGET,
1119				?candidate_hash,
1120				?core_index,
1121				assigned_paras = ?claim_queue.iter_claims_for_core(&core_index).collect::<Vec<_>>(),
1122				?candidate_para_id,
1123				"Invalid CoreIndex, core is not assigned to this para_id"
1124			);
1125			return None;
1126		}
1127		return Some(core_index);
1128	} else {
1129		return Some(core_index);
1130	}
1131}
1132
1133/// Load the data necessary to do backing work on top of a relay-parent.
1134#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1135async fn construct_per_relay_parent_state<Context>(
1136	ctx: &mut Context,
1137	relay_parent: Hash,
1138	keystore: &KeystorePtr,
1139	per_session_cache: &mut PerSessionCache,
1140) -> Result<Option<PerRelayParentState>, Error> {
1141	let parent = relay_parent;
1142
1143	let (session_index, groups, claim_queue, disabled_validators) = futures::try_join!(
1144		request_session_index_for_child(parent, ctx.sender()).await,
1145		request_validator_groups(parent, ctx.sender()).await,
1146		request_claim_queue(parent, ctx.sender()).await,
1147		request_disabled_validators(parent, ctx.sender()).await,
1148	)
1149	.map_err(Error::JoinMultiple)?;
1150
1151	let session_index = try_runtime_api!(session_index);
1152
1153	let validators = per_session_cache.validators(session_index, parent, ctx.sender()).await;
1154	let validators = try_runtime_api!(validators);
1155
1156	let node_features = per_session_cache.node_features(session_index, parent, ctx.sender()).await;
1157	let node_features = try_runtime_api!(node_features);
1158
1159	let executor_params =
1160		per_session_cache.executor_params(session_index, parent, ctx.sender()).await;
1161	let executor_params = try_runtime_api!(executor_params);
1162
1163	gum::debug!(target: LOG_TARGET, ?parent, "New state");
1164
1165	let (validator_groups, group_rotation_info) = try_runtime_api!(groups);
1166
1167	let minimum_backing_votes = per_session_cache
1168		.minimum_backing_votes(session_index, parent, ctx.sender())
1169		.await;
1170	let minimum_backing_votes = try_runtime_api!(minimum_backing_votes);
1171	let claim_queue = try_runtime_api!(claim_queue);
1172	let disabled_validators = try_runtime_api!(disabled_validators);
1173
1174	let signing_context = SigningContext { parent_hash: parent, session_index };
1175	let validator = match Validator::construct(
1176		&validators,
1177		&disabled_validators,
1178		signing_context.clone(),
1179		keystore.clone(),
1180	) {
1181		Ok(v) => Some(v),
1182		Err(util::Error::NotAValidator) => None,
1183		Err(e) => {
1184			gum::warn!(
1185				target: LOG_TARGET,
1186				err = ?e,
1187				"Cannot participate in candidate backing",
1188			);
1189
1190			return Ok(None);
1191		},
1192	};
1193
1194	let n_cores = validator_groups.len();
1195
1196	let mut groups = HashMap::<CoreIndex, Vec<ValidatorIndex>>::new();
1197	let mut assigned_core = None;
1198
1199	for idx in 0..n_cores {
1200		let core_index = CoreIndex(idx as _);
1201
1202		if !claim_queue.contains_key(&core_index) {
1203			continue;
1204		}
1205
1206		let group_index = group_rotation_info.group_for_core(core_index, n_cores);
1207		if let Some(g) = validator_groups.get(group_index.0 as usize) {
1208			if validator.as_ref().map_or(false, |v| g.contains(&v.index())) {
1209				assigned_core = Some(core_index);
1210			}
1211			groups.insert(core_index, g.clone());
1212		}
1213	}
1214	gum::debug!(target: LOG_TARGET, ?groups, "TableContext");
1215
1216	let validator_to_group =
1217		per_session_cache.validator_to_group(session_index, &validators, &validator_groups);
1218
1219	let table_context =
1220		TableContext { validator, groups, validators: validators.to_vec(), disabled_validators };
1221
1222	Ok(Some(PerRelayParentState {
1223		parent,
1224		node_features,
1225		executor_params,
1226		assigned_core,
1227		backed: HashSet::new(),
1228		table: Table::new(),
1229		table_context,
1230		issued_statements: HashSet::new(),
1231		awaiting_validation: HashSet::new(),
1232		fallbacks: HashMap::new(),
1233		minimum_backing_votes,
1234		n_cores: validator_groups.len() as u32,
1235		claim_queue: ClaimQueueSnapshot::from(claim_queue),
1236		validator_to_group,
1237		group_rotation_info,
1238	}))
1239}
1240
1241enum SecondingAllowed {
1242	No,
1243	// On which leaves is seconding allowed.
1244	Yes(Vec<Hash>),
1245}
1246
1247/// Checks whether a candidate can be seconded based on its hypothetical membership in the fragment
1248/// chain.
1249#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1250async fn seconding_sanity_check<Context>(
1251	ctx: &mut Context,
1252	implicit_view: &ImplicitView,
1253	hypothetical_candidate: HypotheticalCandidate,
1254) -> SecondingAllowed {
1255	let mut leaves_for_seconding = Vec::new();
1256	let mut responses = FuturesOrdered::<BoxFuture<'_, Result<_, oneshot::Canceled>>>::new();
1257
1258	let candidate_para = hypothetical_candidate.candidate_para();
1259	let candidate_relay_parent = hypothetical_candidate.relay_parent();
1260	let candidate_hash = hypothetical_candidate.candidate_hash();
1261
1262	for head in implicit_view.leaves() {
1263		// Check that the candidate relay parent is allowed for para, skip the
1264		// leaf otherwise.
1265		let allowed_parents_for_para =
1266			implicit_view.known_allowed_relay_parents_under(head, Some(candidate_para));
1267		if !allowed_parents_for_para.unwrap_or_default().contains(&candidate_relay_parent) {
1268			continue;
1269		}
1270
1271		let (tx, rx) = oneshot::channel();
1272		ctx.send_message(ProspectiveTeyrchainsMessage::GetHypotheticalMembership(
1273			HypotheticalMembershipRequest {
1274				candidates: vec![hypothetical_candidate.clone()],
1275				fragment_chain_relay_parent: Some(*head),
1276			},
1277			tx,
1278		))
1279		.await;
1280		let response = rx.map_ok(move |candidate_memberships| {
1281			let is_member_or_potential = candidate_memberships
1282				.into_iter()
1283				.find_map(|(candidate, leaves)| {
1284					(candidate.candidate_hash() == candidate_hash).then_some(leaves)
1285				})
1286				.and_then(|leaves| leaves.into_iter().find(|leaf| leaf == head))
1287				.is_some();
1288
1289			(is_member_or_potential, head)
1290		});
1291		responses.push_back(response.boxed());
1292	}
1293
1294	if responses.is_empty() {
1295		return SecondingAllowed::No;
1296	}
1297
1298	while let Some(response) = responses.next().await {
1299		match response {
1300			Err(oneshot::Canceled) => {
1301				gum::warn!(
1302					target: LOG_TARGET,
1303					"Failed to reach prospective teyrchains subsystem for hypothetical membership",
1304				);
1305
1306				return SecondingAllowed::No;
1307			},
1308			Ok((is_member_or_potential, head)) => match is_member_or_potential {
1309				false => {
1310					gum::debug!(
1311						target: LOG_TARGET,
1312						?candidate_hash,
1313						leaf_hash = ?head,
1314						"Refusing to second candidate at leaf. Is not a potential member.",
1315					);
1316				},
1317				true => {
1318					leaves_for_seconding.push(*head);
1319				},
1320			},
1321		}
1322	}
1323
1324	if leaves_for_seconding.is_empty() {
1325		SecondingAllowed::No
1326	} else {
1327		SecondingAllowed::Yes(leaves_for_seconding)
1328	}
1329}
1330
1331/// Performs seconding sanity check for an advertisement.
1332#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1333async fn handle_can_second_request<Context>(
1334	ctx: &mut Context,
1335	state: &State,
1336	request: CanSecondRequest,
1337	tx: oneshot::Sender<bool>,
1338) {
1339	let relay_parent = request.candidate_relay_parent;
1340	let response = if state.per_relay_parent.get(&relay_parent).is_some() {
1341		let hypothetical_candidate = HypotheticalCandidate::Incomplete {
1342			candidate_hash: request.candidate_hash,
1343			candidate_para: request.candidate_para_id,
1344			parent_head_data_hash: request.parent_head_data_hash,
1345			candidate_relay_parent: relay_parent,
1346		};
1347
1348		let result =
1349			seconding_sanity_check(ctx, &state.implicit_view, hypothetical_candidate).await;
1350
1351		match result {
1352			SecondingAllowed::No => false,
1353			SecondingAllowed::Yes(leaves) => !leaves.is_empty(),
1354		}
1355	} else {
1356		// Relay parent is unknown or async backing is disabled.
1357		false
1358	};
1359
1360	let _ = tx.send(response);
1361}
1362
1363#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1364async fn handle_validated_candidate_command<Context>(
1365	ctx: &mut Context,
1366	state: &mut State,
1367	relay_parent: Hash,
1368	command: ValidatedCandidateCommand,
1369	metrics: &Metrics,
1370) -> Result<(), Error> {
1371	match state.per_relay_parent.get_mut(&relay_parent) {
1372		Some(rp_state) => {
1373			let candidate_hash = command.candidate_hash();
1374			rp_state.awaiting_validation.remove(&candidate_hash);
1375
1376			match command {
1377				ValidatedCandidateCommand::Second(res) => match res {
1378					Ok(outputs) => {
1379						let BackgroundValidationOutputs {
1380							candidate,
1381							commitments,
1382							persisted_validation_data,
1383						} = outputs;
1384
1385						if rp_state.issued_statements.contains(&candidate_hash) {
1386							return Ok(());
1387						}
1388
1389						let receipt = CommittedCandidateReceipt {
1390							descriptor: candidate.descriptor.clone(),
1391							commitments,
1392						};
1393
1394						let hypothetical_candidate = HypotheticalCandidate::Complete {
1395							candidate_hash,
1396							receipt: Arc::new(receipt.clone()),
1397							persisted_validation_data: persisted_validation_data.clone(),
1398						};
1399						// sanity check that we're allowed to second the candidate
1400						// and that it doesn't conflict with other candidates we've
1401						// seconded.
1402						if let SecondingAllowed::No = seconding_sanity_check(
1403							ctx,
1404							&state.implicit_view,
1405							hypothetical_candidate,
1406						)
1407						.await
1408						{
1409							return Ok(());
1410						};
1411
1412						let statement =
1413							StatementWithPVD::Seconded(receipt, persisted_validation_data);
1414
1415						// If we get an Error::RejectedByProspectiveTeyrchains,
1416						// then the statement has not been distributed or imported into
1417						// the table.
1418						let res = sign_import_and_distribute_statement(
1419							ctx,
1420							rp_state,
1421							&mut state.per_candidate,
1422							statement,
1423							state.keystore.clone(),
1424							metrics,
1425						)
1426						.await;
1427
1428						if let Err(Error::RejectedByProspectiveTeyrchains) = res {
1429							let candidate_hash = candidate.hash();
1430							gum::debug!(
1431								target: LOG_TARGET,
1432								relay_parent = ?candidate.descriptor().relay_parent(),
1433								?candidate_hash,
1434								"Attempted to second candidate but was rejected by prospective teyrchains",
1435							);
1436
1437							// Ensure the collator is reported.
1438							ctx.send_message(CollatorProtocolMessage::Invalid(
1439								candidate.descriptor().relay_parent(),
1440								candidate,
1441							))
1442							.await;
1443
1444							return Ok(());
1445						}
1446
1447						if let Some(stmt) = res? {
1448							match state.per_candidate.get_mut(&candidate_hash) {
1449								None => {
1450									gum::warn!(
1451										target: LOG_TARGET,
1452										?candidate_hash,
1453										"Missing `per_candidate` for seconded candidate.",
1454									);
1455								},
1456								Some(p) => p.seconded_locally = true,
1457							}
1458
1459							rp_state.issued_statements.insert(candidate_hash);
1460
1461							metrics.on_candidate_seconded();
1462							ctx.send_message(CollatorProtocolMessage::Seconded(
1463								rp_state.parent,
1464								StatementWithPVD::drop_pvd_from_signed(stmt),
1465							))
1466							.await;
1467						}
1468					},
1469					Err(candidate) => {
1470						ctx.send_message(CollatorProtocolMessage::Invalid(
1471							rp_state.parent,
1472							candidate,
1473						))
1474						.await;
1475					},
1476				},
1477				ValidatedCandidateCommand::Attest(res) => {
1478					// We are done - avoid new validation spawns:
1479					rp_state.fallbacks.remove(&candidate_hash);
1480					// sanity check.
1481					if !rp_state.issued_statements.contains(&candidate_hash) {
1482						if res.is_ok() {
1483							let statement = StatementWithPVD::Valid(candidate_hash);
1484
1485							sign_import_and_distribute_statement(
1486								ctx,
1487								rp_state,
1488								&mut state.per_candidate,
1489								statement,
1490								state.keystore.clone(),
1491								metrics,
1492							)
1493							.await?;
1494						}
1495						rp_state.issued_statements.insert(candidate_hash);
1496					}
1497				},
1498				ValidatedCandidateCommand::AttestNoPoV(candidate_hash) => {
1499					if let Some(attesting) = rp_state.fallbacks.get_mut(&candidate_hash) {
1500						if let Some(index) = attesting.backing.pop() {
1501							attesting.from_validator = index;
1502							let attesting = attesting.clone();
1503
1504							// The candidate state should be available because we've
1505							// validated it before, the relay-parent is still around,
1506							// and candidates are pruned on the basis of relay-parents.
1507							//
1508							// If it's not, then no point in validating it anyway.
1509							if let Some(pvd) = state
1510								.per_candidate
1511								.get(&candidate_hash)
1512								.map(|pc| pc.persisted_validation_data.clone())
1513							{
1514								kick_off_validation_work(
1515									ctx,
1516									rp_state,
1517									pvd,
1518									&state.background_validation_tx,
1519									attesting,
1520								)
1521								.await?;
1522							}
1523						}
1524					} else {
1525						gum::warn!(
1526							target: LOG_TARGET,
1527							"AttestNoPoV was triggered without fallback being available."
1528						);
1529						debug_assert!(false);
1530					}
1531				},
1532			}
1533		},
1534		None => {
1535			// simple race condition; can be ignored = this relay-parent
1536			// is no longer relevant.
1537		},
1538	}
1539
1540	Ok(())
1541}
1542
1543fn sign_statement(
1544	rp_state: &PerRelayParentState,
1545	statement: StatementWithPVD,
1546	keystore: KeystorePtr,
1547	metrics: &Metrics,
1548) -> Option<SignedFullStatementWithPVD> {
1549	let signed = rp_state
1550		.table_context
1551		.validator
1552		.as_ref()?
1553		.sign(keystore, statement)
1554		.ok()
1555		.flatten()?;
1556	metrics.on_statement_signed();
1557	Some(signed)
1558}
1559
1560/// Import a statement into the statement table and return the summary of the import.
1561///
1562/// This will fail with `Error::RejectedByProspectiveTeyrchains` if the message type is seconded,
1563/// the candidate is fresh, and any of the following are true:
1564/// 1. There is no `PersistedValidationData` attached.
1565/// 2. Prospective teyrchains subsystem returned an empty `HypotheticalMembership` i.e. did not
1566///    recognize the candidate as being applicable to any of the active leaves.
1567#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1568async fn import_statement<Context>(
1569	ctx: &mut Context,
1570	rp_state: &mut PerRelayParentState,
1571	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1572	statement: &SignedFullStatementWithPVD,
1573) -> Result<Option<TableSummary>, Error> {
1574	let candidate_hash = statement.payload().candidate_hash();
1575
1576	gum::debug!(
1577		target: LOG_TARGET,
1578		statement = ?statement.payload().to_compact(),
1579		validator_index = statement.validator_index().0,
1580		?candidate_hash,
1581		"Importing statement",
1582	);
1583
1584	// If this is a new candidate (statement is 'seconded' and candidate is unknown),
1585	// we need to create an entry in the `PerCandidateState` map.
1586	//
1587	// We also need to inform the prospective teyrchains subsystem of the seconded candidate.
1588	// If `ProspectiveTeyrchainsMessage::Second` fails, then we return
1589	// Error::RejectedByProspectiveTeyrchains.
1590	//
1591	// Persisted Validation Data should be available - it may already be available
1592	// if this is a candidate we are seconding.
1593	//
1594	// We should also not accept any candidates which have no valid depths under any of
1595	// our active leaves.
1596	if let StatementWithPVD::Seconded(candidate, pvd) = statement.payload() {
1597		if !per_candidate.contains_key(&candidate_hash) {
1598			let (tx, rx) = oneshot::channel();
1599			ctx.send_message(ProspectiveTeyrchainsMessage::IntroduceSecondedCandidate(
1600				IntroduceSecondedCandidateRequest {
1601					candidate_para: candidate.descriptor.para_id(),
1602					candidate_receipt: candidate.clone(),
1603					persisted_validation_data: pvd.clone(),
1604				},
1605				tx,
1606			))
1607			.await;
1608
1609			match rx.await {
1610				Err(oneshot::Canceled) => {
1611					gum::warn!(
1612						target: LOG_TARGET,
1613						"Could not reach the Prospective Teyrchains subsystem."
1614					);
1615
1616					return Err(Error::RejectedByProspectiveTeyrchains);
1617				},
1618				Ok(false) => return Err(Error::RejectedByProspectiveTeyrchains),
1619				Ok(true) => {},
1620			}
1621
1622			// Only save the candidate if it was approved by prospective teyrchains.
1623			per_candidate.insert(
1624				candidate_hash,
1625				PerCandidateState {
1626					persisted_validation_data: pvd.clone(),
1627					// This is set after importing when seconding locally.
1628					seconded_locally: false,
1629					relay_parent: candidate.descriptor.relay_parent(),
1630				},
1631			);
1632		}
1633	}
1634
1635	let stmt = primitive_statement_to_table(statement);
1636
1637	let core = core_index_from_statement(
1638		&rp_state.validator_to_group,
1639		&rp_state.group_rotation_info,
1640		rp_state.n_cores,
1641		&rp_state.claim_queue,
1642		statement,
1643	)
1644	.ok_or(Error::CoreIndexUnavailable)?;
1645
1646	Ok(rp_state.table.import_statement(&rp_state.table_context, core, stmt))
1647}
1648
1649/// Handles a summary received from [`import_statement`] and dispatches `Backed` notifications and
1650/// misbehaviors as a result of importing a statement.
1651#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1652async fn post_import_statement_actions<Context>(
1653	ctx: &mut Context,
1654	rp_state: &mut PerRelayParentState,
1655	summary: Option<&TableSummary>,
1656) {
1657	if let Some(attested) = summary.as_ref().and_then(|s| {
1658		rp_state.table.attested_candidate(
1659			&s.candidate,
1660			&rp_state.table_context,
1661			rp_state.minimum_backing_votes,
1662		)
1663	}) {
1664		let candidate_hash = attested.candidate.hash();
1665
1666		// `HashSet::insert` returns true if the thing wasn't in there already.
1667		if rp_state.backed.insert(candidate_hash) {
1668			if let Some(backed) = table_attested_to_backed(attested, &rp_state.table_context) {
1669				let para_id = backed.candidate().descriptor.para_id();
1670				gum::debug!(
1671					target: LOG_TARGET,
1672					candidate_hash = ?candidate_hash,
1673					relay_parent = ?rp_state.parent,
1674					%para_id,
1675					"Candidate backed",
1676				);
1677
1678				// Inform the prospective teyrchains subsystem
1679				// that the candidate is now backed.
1680				ctx.send_message(ProspectiveTeyrchainsMessage::CandidateBacked(
1681					para_id,
1682					candidate_hash,
1683				))
1684				.await;
1685				// Notify statement distribution of backed candidate.
1686				ctx.send_message(StatementDistributionMessage::Backed(candidate_hash)).await;
1687			} else {
1688				gum::debug!(target: LOG_TARGET, ?candidate_hash, "Cannot get BackedCandidate");
1689			}
1690		} else {
1691			gum::debug!(target: LOG_TARGET, ?candidate_hash, "Candidate already known");
1692		}
1693	} else {
1694		gum::debug!(target: LOG_TARGET, "No attested candidate");
1695	}
1696
1697	issue_new_misbehaviors(ctx, rp_state.parent, &mut rp_state.table);
1698}
1699
1700/// Check if there have happened any new misbehaviors and issue necessary messages.
1701#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1702fn issue_new_misbehaviors<Context>(
1703	ctx: &mut Context,
1704	relay_parent: Hash,
1705	table: &mut Table<TableContext>,
1706) {
1707	// collect the misbehaviors to avoid double mutable self borrow issues
1708	let misbehaviors: Vec<_> = table.drain_misbehaviors().collect();
1709	for (validator_id, report) in misbehaviors {
1710		// The provisioner waits on candidate-backing, which means
1711		// that we need to send unbounded messages to avoid cycles.
1712		//
1713		// Misbehaviors are bounded by the number of validators and
1714		// the block production protocol.
1715		ctx.send_unbounded_message(ProvisionerMessage::ProvisionableData(
1716			relay_parent,
1717			ProvisionableData::MisbehaviorReport(relay_parent, validator_id, report),
1718		));
1719	}
1720}
1721
1722/// Sign, import, and distribute a statement.
1723#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1724async fn sign_import_and_distribute_statement<Context>(
1725	ctx: &mut Context,
1726	rp_state: &mut PerRelayParentState,
1727	per_candidate: &mut HashMap<CandidateHash, PerCandidateState>,
1728	statement: StatementWithPVD,
1729	keystore: KeystorePtr,
1730	metrics: &Metrics,
1731) -> Result<Option<SignedFullStatementWithPVD>, Error> {
1732	if let Some(signed_statement) = sign_statement(&*rp_state, statement, keystore, metrics) {
1733		let summary = import_statement(ctx, rp_state, per_candidate, &signed_statement).await?;
1734
1735		// `Share` must always be sent before `Backed`. We send the latter in
1736		// `post_import_statement_action` below.
1737		let smsg = StatementDistributionMessage::Share(rp_state.parent, signed_statement.clone());
1738		ctx.send_unbounded_message(smsg);
1739
1740		post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1741
1742		Ok(Some(signed_statement))
1743	} else {
1744		Ok(None)
1745	}
1746}
1747
1748#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1749async fn background_validate_and_make_available<Context>(
1750	ctx: &mut Context,
1751	rp_state: &mut PerRelayParentState,
1752	params: BackgroundValidationParams<
1753		impl overseer::CandidateBackingSenderTrait,
1754		impl Fn(BackgroundValidationResult) -> ValidatedCandidateCommand + Send + 'static + Sync,
1755	>,
1756) -> Result<(), Error> {
1757	let candidate_hash = params.candidate.hash();
1758	let Some(core_index) = rp_state.assigned_core else { return Ok(()) };
1759	if rp_state.awaiting_validation.insert(candidate_hash) {
1760		// spawn background task.
1761		let bg = async move {
1762			if let Err(error) = validate_and_make_available(params, core_index).await {
1763				if let Error::BackgroundValidationMpsc(error) = error {
1764					gum::debug!(
1765						target: LOG_TARGET,
1766						?candidate_hash,
1767						?error,
1768						"Mpsc background validation mpsc died during validation- leaf no longer active?"
1769					);
1770				} else {
1771					gum::error!(
1772						target: LOG_TARGET,
1773						?candidate_hash,
1774						?error,
1775						"Failed to validate and make available",
1776					);
1777				}
1778			}
1779		};
1780
1781		ctx.spawn("backing-validation", bg.boxed())
1782			.map_err(|_| Error::FailedToSpawnBackgroundTask)?;
1783	}
1784
1785	Ok(())
1786}
1787
1788/// Kick off validation work and distribute the result as a signed statement.
1789#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1790async fn kick_off_validation_work<Context>(
1791	ctx: &mut Context,
1792	rp_state: &mut PerRelayParentState,
1793	persisted_validation_data: PersistedValidationData,
1794	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1795	attesting: AttestingData,
1796) -> Result<(), Error> {
1797	// Do nothing if the local validator is disabled or not a validator at all
1798	match rp_state.table_context.local_validator_is_disabled() {
1799		Some(true) => {
1800			gum::info!(target: LOG_TARGET, "We are disabled - don't kick off validation");
1801			return Ok(());
1802		},
1803		Some(false) => {}, // we are not disabled - move on
1804		None => {
1805			gum::debug!(target: LOG_TARGET, "We are not a validator - don't kick off validation");
1806			return Ok(());
1807		},
1808	}
1809
1810	let candidate_hash = attesting.candidate.hash();
1811	if rp_state.issued_statements.contains(&candidate_hash) {
1812		return Ok(());
1813	}
1814
1815	gum::debug!(
1816		target: LOG_TARGET,
1817		candidate_hash = ?candidate_hash,
1818		candidate_receipt = ?attesting.candidate,
1819		"Kicking off validation",
1820	);
1821
1822	let bg_sender = ctx.sender().clone();
1823	let pov = PoVData::FetchFromValidator {
1824		from_validator: attesting.from_validator,
1825		candidate_hash,
1826		pov_hash: attesting.pov_hash,
1827	};
1828
1829	background_validate_and_make_available(
1830		ctx,
1831		rp_state,
1832		BackgroundValidationParams {
1833			sender: bg_sender,
1834			tx_command: background_validation_tx.clone(),
1835			candidate: attesting.candidate,
1836			relay_parent: rp_state.parent,
1837			node_features: rp_state.node_features.clone(),
1838			executor_params: Arc::clone(&rp_state.executor_params),
1839			persisted_validation_data,
1840			pov,
1841			n_validators: rp_state.table_context.validators.len(),
1842			make_command: ValidatedCandidateCommand::Attest,
1843		},
1844	)
1845	.await
1846}
1847
1848/// Import the statement and kick off validation work if it is a part of our assignment.
1849#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1850async fn maybe_validate_and_import<Context>(
1851	ctx: &mut Context,
1852	state: &mut State,
1853	relay_parent: Hash,
1854	statement: SignedFullStatementWithPVD,
1855) -> Result<(), Error> {
1856	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
1857		Some(r) => r,
1858		None => {
1859			gum::trace!(
1860				target: LOG_TARGET,
1861				?relay_parent,
1862				"Received statement for unknown relay-parent"
1863			);
1864
1865			return Ok(());
1866		},
1867	};
1868
1869	// Don't import statement if the sender is disabled
1870	if rp_state.table_context.validator_is_disabled(&statement.validator_index()) {
1871		gum::debug!(
1872			target: LOG_TARGET,
1873			sender_validator_idx = ?statement.validator_index(),
1874			"Not importing statement because the sender is disabled"
1875		);
1876		return Ok(());
1877	}
1878
1879	let res = import_statement(ctx, rp_state, &mut state.per_candidate, &statement).await;
1880
1881	// if we get an Error::RejectedByProspectiveTeyrchains,
1882	// we will do nothing.
1883	if let Err(Error::RejectedByProspectiveTeyrchains) = res {
1884		gum::debug!(
1885			target: LOG_TARGET,
1886			?relay_parent,
1887			"Statement rejected by prospective teyrchains."
1888		);
1889
1890		return Ok(());
1891	}
1892
1893	let summary = res?;
1894	post_import_statement_actions(ctx, rp_state, summary.as_ref()).await;
1895
1896	if let Some(summary) = summary {
1897		// import_statement already takes care of communicating with the
1898		// prospective teyrchains subsystem. At this point, the candidate
1899		// has already been accepted by the subsystem.
1900
1901		let candidate_hash = summary.candidate;
1902
1903		if Some(summary.group_id) != rp_state.assigned_core {
1904			return Ok(());
1905		}
1906
1907		let attesting = match statement.payload() {
1908			StatementWithPVD::Seconded(receipt, _) => {
1909				let attesting = AttestingData {
1910					candidate: rp_state
1911						.table
1912						.get_candidate(&candidate_hash)
1913						.ok_or(Error::CandidateNotFound)?
1914						.to_plain(),
1915					pov_hash: receipt.descriptor.pov_hash(),
1916					from_validator: statement.validator_index(),
1917					backing: Vec::new(),
1918				};
1919				rp_state.fallbacks.insert(summary.candidate, attesting.clone());
1920				attesting
1921			},
1922			StatementWithPVD::Valid(candidate_hash) => {
1923				if let Some(attesting) = rp_state.fallbacks.get_mut(candidate_hash) {
1924					let our_index = rp_state.table_context.validator.as_ref().map(|v| v.index());
1925					if our_index == Some(statement.validator_index()) {
1926						return Ok(());
1927					}
1928
1929					if rp_state.awaiting_validation.contains(candidate_hash) {
1930						// Job already running:
1931						attesting.backing.push(statement.validator_index());
1932						return Ok(());
1933					} else {
1934						// No job, so start another with current validator:
1935						attesting.from_validator = statement.validator_index();
1936						attesting.clone()
1937					}
1938				} else {
1939					return Ok(());
1940				}
1941			},
1942		};
1943
1944		// After `import_statement` succeeds, the candidate entry is guaranteed
1945		// to exist.
1946		if let Some(pvd) = state
1947			.per_candidate
1948			.get(&candidate_hash)
1949			.map(|pc| pc.persisted_validation_data.clone())
1950		{
1951			kick_off_validation_work(
1952				ctx,
1953				rp_state,
1954				pvd,
1955				&state.background_validation_tx,
1956				attesting,
1957			)
1958			.await?;
1959		}
1960	}
1961	Ok(())
1962}
1963
1964/// Kick off background validation with intent to second.
1965#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
1966async fn validate_and_second<Context>(
1967	ctx: &mut Context,
1968	rp_state: &mut PerRelayParentState,
1969	persisted_validation_data: PersistedValidationData,
1970	candidate: &CandidateReceipt,
1971	pov: Arc<PoV>,
1972	background_validation_tx: &mpsc::Sender<(Hash, ValidatedCandidateCommand)>,
1973) -> Result<(), Error> {
1974	let candidate_hash = candidate.hash();
1975
1976	gum::debug!(
1977		target: LOG_TARGET,
1978		candidate_hash = ?candidate_hash,
1979		candidate_receipt = ?candidate,
1980		"Validate and second candidate",
1981	);
1982
1983	let bg_sender = ctx.sender().clone();
1984	background_validate_and_make_available(
1985		ctx,
1986		rp_state,
1987		BackgroundValidationParams {
1988			sender: bg_sender,
1989			tx_command: background_validation_tx.clone(),
1990			candidate: candidate.clone(),
1991			relay_parent: rp_state.parent,
1992			node_features: rp_state.node_features.clone(),
1993			executor_params: Arc::clone(&rp_state.executor_params),
1994			persisted_validation_data,
1995			pov: PoVData::Ready(pov),
1996			n_validators: rp_state.table_context.validators.len(),
1997			make_command: ValidatedCandidateCommand::Second,
1998		},
1999	)
2000	.await?;
2001
2002	Ok(())
2003}
2004
2005#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2006async fn handle_second_message<Context>(
2007	ctx: &mut Context,
2008	state: &mut State,
2009	candidate: CandidateReceipt,
2010	persisted_validation_data: PersistedValidationData,
2011	pov: PoV,
2012	metrics: &Metrics,
2013) -> Result<(), Error> {
2014	let _timer = metrics.time_process_second();
2015
2016	let candidate_hash = candidate.hash();
2017	let relay_parent = candidate.descriptor().relay_parent();
2018
2019	if candidate.descriptor().persisted_validation_data_hash() != persisted_validation_data.hash() {
2020		gum::warn!(
2021			target: LOG_TARGET,
2022			?candidate_hash,
2023			"Candidate backing was asked to second candidate with wrong PVD",
2024		);
2025
2026		return Ok(());
2027	}
2028
2029	let rp_state = match state.per_relay_parent.get_mut(&relay_parent) {
2030		None => {
2031			gum::trace!(
2032				target: LOG_TARGET,
2033				?relay_parent,
2034				?candidate_hash,
2035				"We were asked to second a candidate outside of our view."
2036			);
2037
2038			return Ok(());
2039		},
2040		Some(r) => r,
2041	};
2042
2043	// Just return if the local validator is disabled. If we are here the local node should be a
2044	// validator but defensively use `unwrap_or(false)` to continue processing in this case.
2045	if rp_state.table_context.local_validator_is_disabled().unwrap_or(false) {
2046		gum::warn!(target: LOG_TARGET, "Local validator is disabled. Don't validate and second");
2047		return Ok(());
2048	}
2049
2050	let assigned_paras = rp_state.assigned_core.and_then(|core| rp_state.claim_queue.0.get(&core));
2051
2052	// Sanity check that candidate is from our assignment.
2053	if !matches!(assigned_paras, Some(paras) if paras.contains(&candidate.descriptor().para_id())) {
2054		gum::debug!(
2055			target: LOG_TARGET,
2056			our_assignment_core = ?rp_state.assigned_core,
2057			our_assignment_paras = ?assigned_paras,
2058			collation = ?candidate.descriptor().para_id(),
2059			"Subsystem asked to second for para outside of our assignment",
2060		);
2061		return Ok(());
2062	}
2063
2064	gum::debug!(
2065		target: LOG_TARGET,
2066		our_assignment_core = ?rp_state.assigned_core,
2067		our_assignment_paras = ?assigned_paras,
2068		collation = ?candidate.descriptor().para_id(),
2069		"Current assignments vs collation",
2070	);
2071
2072	// If the message is a `CandidateBackingMessage::Second`, sign and dispatch a
2073	// Seconded statement only if we have not signed a Valid statement for the requested candidate.
2074	//
2075	// The actual logic of issuing the signed statement checks that this isn't
2076	// conflicting with other seconded candidates. Not doing that check here
2077	// gives other subsystems the ability to get us to execute arbitrary candidates,
2078	// but no more.
2079	if !rp_state.issued_statements.contains(&candidate_hash) {
2080		let pov = Arc::new(pov);
2081
2082		validate_and_second(
2083			ctx,
2084			rp_state,
2085			persisted_validation_data,
2086			&candidate,
2087			pov,
2088			&state.background_validation_tx,
2089		)
2090		.await?;
2091	}
2092
2093	Ok(())
2094}
2095
2096#[overseer::contextbounds(CandidateBacking, prefix = self::overseer)]
2097async fn handle_statement_message<Context>(
2098	ctx: &mut Context,
2099	state: &mut State,
2100	relay_parent: Hash,
2101	statement: SignedFullStatementWithPVD,
2102	metrics: &Metrics,
2103) -> Result<(), Error> {
2104	let _timer = metrics.time_process_statement();
2105
2106	// Validator disabling is handled in `maybe_validate_and_import`
2107	match maybe_validate_and_import(ctx, state, relay_parent, statement).await {
2108		Err(Error::ValidationFailed(_)) => Ok(()),
2109		Err(e) => Err(e),
2110		Ok(()) => Ok(()),
2111	}
2112}
2113
2114fn handle_get_backable_candidates_message(
2115	state: &State,
2116	requested_candidates: HashMap<ParaId, Vec<(CandidateHash, Hash)>>,
2117	tx: oneshot::Sender<HashMap<ParaId, Vec<BackedCandidate>>>,
2118	metrics: &Metrics,
2119) -> Result<(), Error> {
2120	let _timer = metrics.time_get_backed_candidates();
2121
2122	let mut backed = HashMap::with_capacity(requested_candidates.len());
2123
2124	for (para_id, para_candidates) in requested_candidates {
2125		for (candidate_hash, relay_parent) in para_candidates.iter() {
2126			let rp_state = match state.per_relay_parent.get(&relay_parent) {
2127				Some(rp_state) => rp_state,
2128				None => {
2129					gum::debug!(
2130						target: LOG_TARGET,
2131						?relay_parent,
2132						?candidate_hash,
2133						"Requested candidate's relay parent is out of view",
2134					);
2135					break;
2136				},
2137			};
2138			let maybe_backed_candidate = rp_state
2139				.table
2140				.attested_candidate(
2141					candidate_hash,
2142					&rp_state.table_context,
2143					rp_state.minimum_backing_votes,
2144				)
2145				.and_then(|attested| table_attested_to_backed(attested, &rp_state.table_context));
2146
2147			if let Some(backed_candidate) = maybe_backed_candidate {
2148				backed
2149					.entry(para_id)
2150					.or_insert_with(|| Vec::with_capacity(para_candidates.len()))
2151					.push(backed_candidate);
2152			} else {
2153				break;
2154			}
2155		}
2156	}
2157
2158	tx.send(backed).map_err(|data| Error::Send(data))?;
2159	Ok(())
2160}