polkadot_runtime_parachains/paras_inherent/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Provides glue code over the scheduler and inclusion modules, and accepting
18//! one inherent per block that can include new para candidates and bitfields.
19//!
20//! Unlike other modules in this crate, it does not need to be initialized by the initializer,
21//! as it has no initialization logic and its finalization logic depends only on the details of
22//! this module.
23
24use crate::{
25	configuration,
26	disputes::DisputesHandler,
27	inclusion::{self, CandidateCheckContext},
28	initializer,
29	metrics::METRICS,
30	paras, scheduler,
31	shared::{self, AllowedRelayParentsTracker},
32	ParaId,
33};
34use alloc::{
35	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
36	vec,
37	vec::Vec,
38};
39use bitvec::prelude::BitVec;
40use core::result::Result;
41use frame_support::{
42	defensive,
43	dispatch::{DispatchErrorWithPostInfo, PostDispatchInfo},
44	inherent::{InherentData, InherentIdentifier, MakeFatalError, ProvideInherent},
45	pallet_prelude::*,
46	traits::Randomness,
47};
48
49use frame_system::pallet_prelude::*;
50use pallet_babe::{self, ParentBlockRandomness};
51use polkadot_primitives::{
52	effective_minimum_backing_votes,
53	node_features::FeatureIndex,
54	vstaging::{
55		BackedCandidate, CandidateDescriptorVersion, CandidateReceiptV2 as CandidateReceipt,
56		InherentData as ParachainsInherentData, ScrapedOnChainVotes,
57	},
58	CandidateHash, CheckedDisputeStatementSet, CheckedMultiDisputeStatementSet, CoreIndex,
59	DisputeStatementSet, HeadData, MultiDisputeStatementSet, SessionIndex,
60	SignedAvailabilityBitfields, SigningContext, UncheckedSignedAvailabilityBitfield,
61	UncheckedSignedAvailabilityBitfields, ValidatorId, ValidatorIndex, ValidityAttestation,
62	PARACHAINS_INHERENT_IDENTIFIER,
63};
64use rand::{seq::SliceRandom, SeedableRng};
65use scale_info::TypeInfo;
66use sp_runtime::traits::{Header as HeaderT, One};
67
68mod misc;
69mod weights;
70
71use self::weights::checked_multi_dispute_statement_sets_weight;
72pub use self::{
73	misc::{IndexedRetain, IsSortedBy},
74	weights::{
75		backed_candidate_weight, backed_candidates_weight, dispute_statement_set_weight,
76		multi_dispute_statement_sets_weight, paras_inherent_total_weight, signed_bitfield_weight,
77		signed_bitfields_weight, TestWeightInfo, WeightInfo,
78	},
79};
80
81#[cfg(feature = "runtime-benchmarks")]
82mod benchmarking;
83
84#[cfg(test)]
85mod tests;
86
87const LOG_TARGET: &str = "runtime::inclusion-inherent";
88
89/// A bitfield concerning concluded disputes for candidates
90/// associated to the core index equivalent to the bit position.
91#[derive(Default, PartialEq, Eq, Clone, Encode, Decode, RuntimeDebug, TypeInfo)]
92pub(crate) struct DisputedBitfield(pub(crate) BitVec<u8, bitvec::order::Lsb0>);
93
94impl From<BitVec<u8, bitvec::order::Lsb0>> for DisputedBitfield {
95	fn from(inner: BitVec<u8, bitvec::order::Lsb0>) -> Self {
96		Self(inner)
97	}
98}
99
100#[cfg(test)]
101impl DisputedBitfield {
102	/// Create a new bitfield, where each bit is set to `false`.
103	pub fn zeros(n: usize) -> Self {
104		Self::from(BitVec::<u8, bitvec::order::Lsb0>::repeat(false, n))
105	}
106}
107
108pub use pallet::*;
109
110#[frame_support::pallet]
111pub mod pallet {
112	use super::*;
113
114	#[pallet::pallet]
115	#[pallet::without_storage_info]
116	pub struct Pallet<T>(_);
117
118	#[pallet::config]
119	#[pallet::disable_frame_system_supertrait_check]
120	pub trait Config:
121		inclusion::Config + scheduler::Config + initializer::Config + pallet_babe::Config
122	{
123		/// Weight information for extrinsics in this pallet.
124		type WeightInfo: WeightInfo;
125	}
126
127	#[pallet::error]
128	pub enum Error<T> {
129		/// Inclusion inherent called more than once per block.
130		TooManyInclusionInherents,
131		/// The hash of the submitted parent header doesn't correspond to the saved block hash of
132		/// the parent.
133		InvalidParentHeader,
134		/// Inherent data was filtered during execution. This should have only been done
135		/// during creation.
136		InherentDataFilteredDuringExecution,
137		/// Too many candidates supplied.
138		UnscheduledCandidate,
139	}
140
141	/// Whether the paras inherent was included within this block.
142	///
143	/// The `Option<()>` is effectively a `bool`, but it never hits storage in the `None` variant
144	/// due to the guarantees of FRAME's storage APIs.
145	///
146	/// If this is `None` at the end of the block, we panic and render the block invalid.
147	#[pallet::storage]
148	pub(crate) type Included<T> = StorageValue<_, ()>;
149
150	/// Scraped on chain data for extracting resolved disputes as well as backing votes.
151	#[pallet::storage]
152	pub type OnChainVotes<T: Config> = StorageValue<_, ScrapedOnChainVotes<T::Hash>>;
153
154	/// Update the disputes statements set part of the on-chain votes.
155	pub(crate) fn set_scrapable_on_chain_disputes<T: Config>(
156		session: SessionIndex,
157		checked_disputes: CheckedMultiDisputeStatementSet,
158	) {
159		crate::paras_inherent::OnChainVotes::<T>::mutate(move |value| {
160			let disputes =
161				checked_disputes.into_iter().map(DisputeStatementSet::from).collect::<Vec<_>>();
162			let backing_validators_per_candidate = match value.take() {
163				Some(v) => v.backing_validators_per_candidate,
164				None => Vec::new(),
165			};
166			*value = Some(ScrapedOnChainVotes::<T::Hash> {
167				backing_validators_per_candidate,
168				disputes,
169				session,
170			});
171		})
172	}
173
174	/// Update the backing votes including part of the on-chain votes.
175	pub(crate) fn set_scrapable_on_chain_backings<T: Config>(
176		session: SessionIndex,
177		backing_validators_per_candidate: Vec<(
178			CandidateReceipt<T::Hash>,
179			Vec<(ValidatorIndex, ValidityAttestation)>,
180		)>,
181	) {
182		crate::paras_inherent::OnChainVotes::<T>::mutate(move |value| {
183			let disputes = match value.take() {
184				Some(v) => v.disputes,
185				None => MultiDisputeStatementSet::default(),
186			};
187			*value = Some(ScrapedOnChainVotes::<T::Hash> {
188				backing_validators_per_candidate,
189				disputes,
190				session,
191			});
192		})
193	}
194
195	#[pallet::hooks]
196	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
197		fn on_initialize(_: BlockNumberFor<T>) -> Weight {
198			T::DbWeight::get().reads_writes(1, 1) // in `on_finalize`.
199		}
200
201		fn on_finalize(_: BlockNumberFor<T>) {
202			if Included::<T>::take().is_none() {
203				panic!("Bitfields and heads must be included every block");
204			}
205		}
206	}
207
208	#[pallet::inherent]
209	impl<T: Config> ProvideInherent for Pallet<T> {
210		type Call = Call<T>;
211		type Error = MakeFatalError<()>;
212		const INHERENT_IDENTIFIER: InherentIdentifier = PARACHAINS_INHERENT_IDENTIFIER;
213
214		fn create_inherent(data: &InherentData) -> Option<Self::Call> {
215			let inherent_data = Self::create_inherent_inner(data)?;
216
217			Some(Call::enter { data: inherent_data })
218		}
219
220		fn is_inherent(call: &Self::Call) -> bool {
221			matches!(call, Call::enter { .. })
222		}
223	}
224
225	#[pallet::call]
226	impl<T: Config> Pallet<T> {
227		/// Enter the paras inherent. This will process bitfields and backed candidates.
228		#[pallet::call_index(0)]
229		#[pallet::weight((
230			paras_inherent_total_weight::<T>(
231				data.backed_candidates.as_slice(),
232				&data.bitfields,
233				&data.disputes,
234			),
235			DispatchClass::Mandatory,
236		))]
237		pub fn enter(
238			origin: OriginFor<T>,
239			data: ParachainsInherentData<HeaderFor<T>>,
240		) -> DispatchResultWithPostInfo {
241			ensure_none(origin)?;
242
243			ensure!(!Included::<T>::exists(), Error::<T>::TooManyInclusionInherents);
244			Included::<T>::set(Some(()));
245			let initial_data = data.clone();
246
247			Self::process_inherent_data(data).and_then(|(processed, post_info)| {
248				ensure!(initial_data == processed, Error::<T>::InherentDataFilteredDuringExecution);
249				Ok(post_info)
250			})
251		}
252	}
253}
254
255impl<T: Config> Pallet<T> {
256	/// Create the `ParachainsInherentData` that gets passed to [`Self::enter`] in
257	/// [`Self::create_inherent`]. This code is pulled out of [`Self::create_inherent`] so it can be
258	/// unit tested.
259	fn create_inherent_inner(data: &InherentData) -> Option<ParachainsInherentData<HeaderFor<T>>> {
260		let parachains_inherent_data = match data.get_data(&Self::INHERENT_IDENTIFIER) {
261			Ok(Some(d)) => d,
262			Ok(None) => return None,
263			Err(_) => {
264				log::warn!(target: LOG_TARGET, "ParachainsInherentData failed to decode");
265				return None
266			},
267		};
268		match Self::process_inherent_data(parachains_inherent_data) {
269			Ok((processed, _)) => Some(processed),
270			Err(err) => {
271				log::warn!(target: LOG_TARGET, "Processing inherent data failed: {:?}", err);
272				None
273			},
274		}
275	}
276
277	/// Process inherent data.
278	///
279	/// The given inherent data is processed and state is altered accordingly. If any data could
280	/// not be applied (inconsistencies, weight limit, ...) it is removed.
281	///
282	/// Returns: Result containing processed inherent data and weight, the processed inherent would
283	/// consume.
284	fn process_inherent_data(
285		data: ParachainsInherentData<HeaderFor<T>>,
286	) -> Result<(ParachainsInherentData<HeaderFor<T>>, PostDispatchInfo), DispatchErrorWithPostInfo>
287	{
288		#[cfg(feature = "runtime-metrics")]
289		sp_io::init_tracing();
290
291		let ParachainsInherentData {
292			mut bitfields,
293			mut backed_candidates,
294			parent_header,
295			mut disputes,
296		} = data;
297
298		log::debug!(
299			target: LOG_TARGET,
300			"[process_inherent_data] bitfields.len(): {}, backed_candidates.len(): {}, disputes.len() {}",
301			bitfields.len(),
302			backed_candidates.len(),
303			disputes.len()
304		);
305
306		let parent_hash = frame_system::Pallet::<T>::parent_hash();
307
308		ensure!(
309			parent_header.hash().as_ref() == parent_hash.as_ref(),
310			Error::<T>::InvalidParentHeader,
311		);
312
313		let now = frame_system::Pallet::<T>::block_number();
314		let config = configuration::ActiveConfig::<T>::get();
315
316		// Before anything else, update the allowed relay-parents.
317		{
318			let parent_number = now - One::one();
319			let parent_storage_root = *parent_header.state_root();
320
321			shared::AllowedRelayParents::<T>::mutate(|tracker| {
322				tracker.update(
323					parent_hash,
324					parent_storage_root,
325					scheduler::ClaimQueue::<T>::get()
326						.into_iter()
327						.map(|(core_index, paras)| {
328							(core_index, paras.into_iter().map(|e| e.para_id()).collect())
329						})
330						.collect(),
331					parent_number,
332					config.scheduler_params.lookahead,
333				);
334			});
335		}
336
337		let candidates_weight = backed_candidates_weight::<T>(&backed_candidates);
338		let bitfields_weight = signed_bitfields_weight::<T>(&bitfields);
339		let disputes_weight = multi_dispute_statement_sets_weight::<T>(&disputes);
340
341		// Weight before filtering/sanitization except for enacting the candidates
342		let weight_before_filtering = candidates_weight + bitfields_weight + disputes_weight;
343
344		METRICS.on_before_filter(weight_before_filtering.ref_time());
345		log::debug!(target: LOG_TARGET, "Size before filter: {}, candidates + bitfields: {}, disputes: {}", weight_before_filtering.proof_size(), candidates_weight.proof_size() + bitfields_weight.proof_size(), disputes_weight.proof_size());
346		log::debug!(target: LOG_TARGET, "Time weight before filter: {}, candidates + bitfields: {}, disputes: {}", weight_before_filtering.ref_time(), candidates_weight.ref_time() + bitfields_weight.ref_time(), disputes_weight.ref_time());
347
348		let current_session = shared::CurrentSessionIndex::<T>::get();
349		let expected_bits = scheduler::Pallet::<T>::num_availability_cores();
350		let validator_public = shared::ActiveValidatorKeys::<T>::get();
351
352		// We are assuming (incorrectly) to have all the weight (for the mandatory class or even
353		// full block) available to us. This can lead to slightly overweight blocks, which still
354		// works as the dispatch class for `enter` is `Mandatory`. By using the `Mandatory`
355		// dispatch class, the upper layers impose no limit on the weight of this inherent, instead
356		// we limit ourselves and make sure to stay within reasonable bounds. It might make sense
357		// to subtract BlockWeights::base_block to reduce chances of becoming overweight.
358		let max_block_weight = {
359			let dispatch_class = DispatchClass::Mandatory;
360			let max_block_weight_full = <T as frame_system::Config>::BlockWeights::get();
361			log::debug!(target: LOG_TARGET, "Max block weight: {}", max_block_weight_full.max_block);
362			// Get max block weight for the mandatory class if defined, otherwise total max weight
363			// of the block.
364			let max_weight = max_block_weight_full
365				.per_class
366				.get(dispatch_class)
367				.max_total
368				.unwrap_or(max_block_weight_full.max_block);
369			log::debug!(target: LOG_TARGET, "Used max block time weight: {}", max_weight);
370
371			let max_block_size_full = <T as frame_system::Config>::BlockLength::get();
372			let max_block_size = max_block_size_full.max.get(dispatch_class);
373			log::debug!(target: LOG_TARGET, "Used max block size: {}", max_block_size);
374
375			// Adjust proof size to max block size as we are tracking tx size.
376			max_weight.set_proof_size(*max_block_size as u64)
377		};
378		log::debug!(target: LOG_TARGET, "Used max block weight: {}", max_block_weight);
379
380		let entropy = compute_entropy::<T>(parent_hash);
381		let mut rng = rand_chacha::ChaChaRng::from_seed(entropy.into());
382
383		// Filter out duplicates and continue.
384		if let Err(()) = T::DisputesHandler::deduplicate_and_sort_dispute_data(&mut disputes) {
385			log::debug!(target: LOG_TARGET, "Found duplicate statement sets, retaining the first");
386		}
387
388		let post_conclusion_acceptance_period = config.dispute_post_conclusion_acceptance_period;
389
390		let dispute_statement_set_valid = move |set: DisputeStatementSet| {
391			T::DisputesHandler::filter_dispute_data(set, post_conclusion_acceptance_period)
392		};
393
394		// Limit the disputes first, since the following statements depend on the votes included
395		// here.
396		let (checked_disputes_sets, checked_disputes_sets_consumed_weight) =
397			limit_and_sanitize_disputes::<T, _>(
398				disputes,
399				dispute_statement_set_valid,
400				max_block_weight,
401			);
402
403		let mut all_weight_after = {
404			// Assure the maximum block weight is adhered, by limiting bitfields and backed
405			// candidates. Dispute statement sets were already limited before.
406			let non_disputes_weight = apply_weight_limit::<T>(
407				&mut backed_candidates,
408				&mut bitfields,
409				max_block_weight.saturating_sub(checked_disputes_sets_consumed_weight),
410				&mut rng,
411			);
412
413			let all_weight_after =
414				non_disputes_weight.saturating_add(checked_disputes_sets_consumed_weight);
415
416			METRICS.on_after_filter(all_weight_after.ref_time());
417			log::debug!(
418				target: LOG_TARGET,
419				"[process_inherent_data] after filter: bitfields.len(): {}, backed_candidates.len(): {}, checked_disputes_sets.len() {}",
420				bitfields.len(),
421				backed_candidates.len(),
422				checked_disputes_sets.len()
423			);
424			log::debug!(target: LOG_TARGET, "Size after filter: {}, candidates + bitfields: {}, disputes: {}", all_weight_after.proof_size(), non_disputes_weight.proof_size(), checked_disputes_sets_consumed_weight.proof_size());
425			log::debug!(target: LOG_TARGET, "Time weight after filter: {}, candidates + bitfields: {}, disputes: {}", all_weight_after.ref_time(), non_disputes_weight.ref_time(), checked_disputes_sets_consumed_weight.ref_time());
426
427			if all_weight_after.any_gt(max_block_weight) {
428				log::warn!(target: LOG_TARGET, "Post weight limiting weight is still too large, time: {}, size: {}", all_weight_after.ref_time(), all_weight_after.proof_size());
429			}
430			all_weight_after
431		};
432
433		// Note that `process_checked_multi_dispute_data` will iterate and import each
434		// dispute; so the input here must be reasonably bounded,
435		// which is guaranteed by the checks and weight limitation above.
436		// We don't care about fresh or not disputes
437		// this writes them to storage, so let's query it via those means
438		// if this fails for whatever reason, that's ok.
439		if let Err(e) =
440			T::DisputesHandler::process_checked_multi_dispute_data(&checked_disputes_sets)
441		{
442			log::warn!(target: LOG_TARGET, "MultiDisputesData failed to update: {:?}", e);
443		};
444		METRICS.on_disputes_imported(checked_disputes_sets.len() as u64);
445
446		set_scrapable_on_chain_disputes::<T>(current_session, checked_disputes_sets.clone());
447
448		if T::DisputesHandler::is_frozen() {
449			// Relay chain freeze, at this point we will not include any parachain blocks.
450			METRICS.on_relay_chain_freeze();
451
452			let disputes = checked_disputes_sets
453				.into_iter()
454				.map(|checked| checked.into())
455				.collect::<Vec<_>>();
456			let processed = ParachainsInherentData {
457				bitfields: Vec::new(),
458				backed_candidates: Vec::new(),
459				disputes,
460				parent_header,
461			};
462
463			// The relay chain we are currently on is invalid. Proceed no further on parachains.
464			return Ok((processed, Some(checked_disputes_sets_consumed_weight).into()))
465		}
466
467		// Contains the disputes that are concluded in the current session only,
468		// since these are the only ones that are relevant for the occupied cores
469		// and lightens the load on `free_disputed` significantly.
470		// Cores can't be occupied with candidates of the previous sessions, and only
471		// things with new votes can have just concluded. We only need to collect
472		// cores with disputes that conclude just now, because disputes that
473		// concluded longer ago have already had any corresponding cores cleaned up.
474		let current_concluded_invalid_disputes = checked_disputes_sets
475			.iter()
476			.map(AsRef::as_ref)
477			.filter(|dss| dss.session == current_session)
478			.map(|dss| (dss.session, dss.candidate_hash))
479			.filter(|(session, candidate)| {
480				<T>::DisputesHandler::concluded_invalid(*session, *candidate)
481			})
482			.map(|(_session, candidate)| candidate)
483			.collect::<BTreeSet<CandidateHash>>();
484
485		// Get the cores freed as a result of concluded invalid candidates.
486		let (freed_disputed, concluded_invalid_hashes): (Vec<CoreIndex>, BTreeSet<CandidateHash>) =
487			inclusion::Pallet::<T>::free_disputed(&current_concluded_invalid_disputes)
488				.into_iter()
489				.unzip();
490
491		// Create a bit index from the set of core indices where each index corresponds to
492		// a core index that was freed due to a dispute.
493		//
494		// I.e. 010100 would indicate, the candidates on Core 1 and 3 would be disputed.
495		let disputed_bitfield = create_disputed_bitfield(expected_bits, freed_disputed.iter());
496
497		let bitfields = sanitize_bitfields::<T>(
498			bitfields,
499			disputed_bitfield,
500			expected_bits,
501			parent_hash,
502			current_session,
503			&validator_public[..],
504		);
505		METRICS.on_bitfields_processed(bitfields.len() as u64);
506
507		// Process new availability bitfields, yielding any availability cores whose
508		// work has now concluded.
509		let (enact_weight, freed_concluded) =
510			inclusion::Pallet::<T>::update_pending_availability_and_get_freed_cores(
511				&validator_public[..],
512				bitfields.clone(),
513			);
514		all_weight_after.saturating_accrue(enact_weight);
515		log::debug!(
516			target: LOG_TARGET,
517			"Enacting weight: {}, all weight: {}",
518			enact_weight.ref_time(),
519			all_weight_after.ref_time(),
520		);
521
522		// It's possible that that after the enacting the candidates, the total weight
523		// goes over the limit, however, we can't do anything about it at this point.
524		// By using the `Mandatory` weight, we ensure the block is still accepted,
525		// but no other (user) transactions can be included.
526		if all_weight_after.any_gt(max_block_weight) {
527			log::warn!(
528				target: LOG_TARGET,
529				"Overweight para inherent data after enacting the candidates {:?}: {} > {}",
530				parent_hash,
531				all_weight_after,
532				max_block_weight,
533			);
534		}
535
536		// Inform the disputes module of all included candidates.
537		for (_, candidate_hash) in &freed_concluded {
538			T::DisputesHandler::note_included(current_session, *candidate_hash, now);
539		}
540
541		METRICS.on_candidates_included(freed_concluded.len() as u64);
542
543		// Get the timed out candidates
544		let freed_timeout = if scheduler::Pallet::<T>::availability_timeout_check_required() {
545			inclusion::Pallet::<T>::free_timedout()
546		} else {
547			Vec::new()
548		};
549
550		if !freed_timeout.is_empty() {
551			log::debug!(target: LOG_TARGET, "Evicted timed out cores: {:?}", freed_timeout);
552		}
553
554		// Back candidates.
555		let (candidate_receipt_with_backing_validator_indices, backed_candidates_with_core) =
556			Self::back_candidates(concluded_invalid_hashes, backed_candidates)?;
557
558		set_scrapable_on_chain_backings::<T>(
559			current_session,
560			candidate_receipt_with_backing_validator_indices,
561		);
562
563		let disputes = checked_disputes_sets
564			.into_iter()
565			.map(|checked| checked.into())
566			.collect::<Vec<_>>();
567
568		let bitfields = bitfields.into_iter().map(|v| v.into_unchecked()).collect();
569
570		let count = backed_candidates_with_core.len();
571		let processed = ParachainsInherentData {
572			bitfields,
573			backed_candidates: backed_candidates_with_core.into_iter().fold(
574				Vec::with_capacity(count),
575				|mut acc, (_id, candidates)| {
576					acc.extend(candidates.into_iter().map(|(c, _)| c));
577					acc
578				},
579			),
580			disputes,
581			parent_header,
582		};
583		Ok((processed, Some(all_weight_after).into()))
584	}
585
586	fn back_candidates(
587		concluded_invalid_hashes: BTreeSet<CandidateHash>,
588		backed_candidates: Vec<BackedCandidate<T::Hash>>,
589	) -> Result<
590		(
591			Vec<(CandidateReceipt<T::Hash>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
592			BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
593		),
594		DispatchErrorWithPostInfo,
595	> {
596		let allowed_relay_parents = shared::AllowedRelayParents::<T>::get();
597		let upcoming_new_session = initializer::Pallet::<T>::upcoming_session_change();
598
599		METRICS.on_candidates_processed_total(backed_candidates.len() as u64);
600
601		if !upcoming_new_session {
602			let occupied_cores =
603				inclusion::Pallet::<T>::get_occupied_cores().map(|(core, _)| core).collect();
604
605			let mut eligible: BTreeMap<ParaId, BTreeSet<CoreIndex>> = BTreeMap::new();
606			let mut total_eligible_cores = 0;
607
608			for (core_idx, para_id) in Self::eligible_paras(&occupied_cores) {
609				total_eligible_cores += 1;
610				log::trace!(target: LOG_TARGET, "Found eligible para {:?} on core {:?}", para_id, core_idx);
611				eligible.entry(para_id).or_default().insert(core_idx);
612			}
613
614			let node_features = configuration::ActiveConfig::<T>::get().node_features;
615
616			let allow_v2_receipts = node_features
617				.get(FeatureIndex::CandidateReceiptV2 as usize)
618				.map(|b| *b)
619				.unwrap_or(false);
620
621			let backed_candidates_with_core = sanitize_backed_candidates::<T>(
622				backed_candidates,
623				&allowed_relay_parents,
624				concluded_invalid_hashes,
625				eligible,
626				allow_v2_receipts,
627			);
628			let count = count_backed_candidates(&backed_candidates_with_core);
629
630			ensure!(count <= total_eligible_cores, Error::<T>::UnscheduledCandidate);
631
632			METRICS.on_candidates_sanitized(count as u64);
633
634			// Process backed candidates according to scheduled cores.
635			let candidate_receipt_with_backing_validator_indices =
636				inclusion::Pallet::<T>::process_candidates(
637					&allowed_relay_parents,
638					&backed_candidates_with_core,
639					scheduler::Pallet::<T>::group_validators,
640				)?;
641
642			// We need to advance the claim queue on all cores, except for the ones that did not
643			// get freed in this block. The ones that did not get freed also cannot be newly
644			// occupied.
645			scheduler::Pallet::<T>::advance_claim_queue(&occupied_cores);
646
647			Ok((candidate_receipt_with_backing_validator_indices, backed_candidates_with_core))
648		} else {
649			log::debug!(
650				target: LOG_TARGET,
651				"Upcoming session change, not backing any new candidates."
652			);
653			// If we'll initialize a new session at the end of the block, we don't want to
654			// advance the claim queue.
655
656			Ok((vec![], BTreeMap::new()))
657		}
658	}
659
660	/// Paras that may get backed on cores.
661	///
662	/// 1. The para must be scheduled on core.
663	/// 2. Core needs to be free, otherwise backing is not possible.
664	///
665	/// We get a set of the occupied cores as input.
666	pub(crate) fn eligible_paras<'a>(
667		occupied_cores: &'a BTreeSet<CoreIndex>,
668	) -> impl Iterator<Item = (CoreIndex, ParaId)> + 'a {
669		scheduler::ClaimQueue::<T>::get().into_iter().filter_map(|(core_idx, queue)| {
670			if occupied_cores.contains(&core_idx) {
671				return None
672			}
673			let next_scheduled = queue.front()?;
674			Some((core_idx, next_scheduled.para_id()))
675		})
676	}
677}
678
679/// Derive a bitfield from dispute
680pub(super) fn create_disputed_bitfield<'a, I>(
681	expected_bits: usize,
682	freed_cores: I,
683) -> DisputedBitfield
684where
685	I: 'a + IntoIterator<Item = &'a CoreIndex>,
686{
687	let mut bitvec = BitVec::repeat(false, expected_bits);
688	for core_idx in freed_cores {
689		let core_idx = core_idx.0 as usize;
690		if core_idx < expected_bits {
691			bitvec.set(core_idx, true);
692		}
693	}
694	DisputedBitfield::from(bitvec)
695}
696
697/// Select a random subset, with preference for certain indices.
698///
699/// Adds random items to the set until all candidates
700/// are tried or the remaining weight is depleted.
701///
702/// Returns the weight of all selected items from `selectables`
703/// as well as their indices in ascending order.
704fn random_sel<X, F: Fn(&X) -> Weight>(
705	rng: &mut rand_chacha::ChaChaRng,
706	selectables: &[X],
707	mut preferred_indices: Vec<usize>,
708	weight_fn: F,
709	weight_limit: Weight,
710) -> (Weight, Vec<usize>) {
711	if selectables.is_empty() {
712		return (Weight::zero(), Vec::new())
713	}
714	// all indices that are not part of the preferred set
715	let mut indices = (0..selectables.len())
716		.into_iter()
717		.filter(|idx| !preferred_indices.contains(idx))
718		.collect::<Vec<_>>();
719	let mut picked_indices = Vec::with_capacity(selectables.len().saturating_sub(1));
720
721	let mut weight_acc = Weight::zero();
722
723	preferred_indices.shuffle(rng);
724	for preferred_idx in preferred_indices {
725		// preferred indices originate from outside
726		if let Some(item) = selectables.get(preferred_idx) {
727			let updated = weight_acc.saturating_add(weight_fn(item));
728			if updated.any_gt(weight_limit) {
729				continue
730			}
731			weight_acc = updated;
732			picked_indices.push(preferred_idx);
733		}
734	}
735
736	indices.shuffle(rng);
737	for idx in indices {
738		let item = &selectables[idx];
739		let updated = weight_acc.saturating_add(weight_fn(item));
740
741		if updated.any_gt(weight_limit) {
742			continue
743		}
744		weight_acc = updated;
745
746		picked_indices.push(idx);
747	}
748
749	// sorting indices, so the ordering is retained
750	// unstable sorting is fine, since there are no duplicates in indices
751	// and even if there were, they don't have an identity
752	picked_indices.sort_unstable();
753	(weight_acc, picked_indices)
754}
755
756/// Considers an upper threshold that the inherent data must not exceed.
757///
758/// If there is sufficient space, all bitfields and all candidates
759/// will be included.
760///
761/// Otherwise tries to include all disputes, and then tries to fill the remaining space with
762/// bitfields and then candidates.
763///
764/// The selection process is random. For candidates, there is an exception for code upgrades as they
765/// are preferred. And for disputes, local and older disputes are preferred (see
766/// `limit_and_sanitize_disputes`). for backed candidates, since with a increasing number of
767/// parachains their chances of inclusion become slim. All backed candidates  are checked
768/// beforehand in `fn create_inherent_inner` which guarantees sanity.
769///
770/// Assumes disputes are already filtered by the time this is called.
771///
772/// Returns the total weight consumed by `bitfields` and `candidates`.
773pub(crate) fn apply_weight_limit<T: Config + inclusion::Config>(
774	candidates: &mut Vec<BackedCandidate<<T>::Hash>>,
775	bitfields: &mut UncheckedSignedAvailabilityBitfields,
776	max_consumable_weight: Weight,
777	rng: &mut rand_chacha::ChaChaRng,
778) -> Weight {
779	let total_candidates_weight = backed_candidates_weight::<T>(candidates.as_slice());
780
781	let total_bitfields_weight = signed_bitfields_weight::<T>(&bitfields);
782
783	let total = total_bitfields_weight.saturating_add(total_candidates_weight);
784
785	// candidates + bitfields fit into the block
786	if max_consumable_weight.all_gte(total) {
787		return total
788	}
789
790	// Invariant: block author provides candidate in the order in which they form a chain
791	// wrt elastic scaling. If the invariant is broken, we'd fail later when filtering candidates
792	// which are unchained.
793
794	let mut chained_candidates: Vec<Vec<_>> = Vec::new();
795	let mut current_para_id = None;
796
797	for candidate in core::mem::take(candidates).into_iter() {
798		let candidate_para_id = candidate.descriptor().para_id();
799		if Some(candidate_para_id) == current_para_id {
800			let chain = chained_candidates
801				.last_mut()
802				.expect("if the current_para_id is Some, then vec is not empty; qed");
803			chain.push(candidate);
804		} else {
805			current_para_id = Some(candidate_para_id);
806			chained_candidates.push(vec![candidate]);
807		}
808	}
809
810	// Elastic scaling: we prefer chains that have a code upgrade among the candidates,
811	// as the candidates containing the upgrade tend to be large and hence stand no chance to
812	// be picked late while maintaining the weight bounds.
813	//
814	// Limitations: For simplicity if total weight of a chain of candidates is larger than
815	// the remaining weight, the chain will still not be included while it could still be possible
816	// to include part of that chain.
817	let preferred_chain_indices = chained_candidates
818		.iter()
819		.enumerate()
820		.filter_map(|(idx, candidates)| {
821			// Check if any of the candidate in chain contains a code upgrade.
822			if candidates
823				.iter()
824				.any(|candidate| candidate.candidate().commitments.new_validation_code.is_some())
825			{
826				Some(idx)
827			} else {
828				None
829			}
830		})
831		.collect::<Vec<usize>>();
832
833	// There is weight remaining to be consumed by a subset of chained candidates
834	// which are going to be picked now.
835	if let Some(max_consumable_by_candidates) =
836		max_consumable_weight.checked_sub(&total_bitfields_weight)
837	{
838		let (acc_candidate_weight, chained_indices) =
839			random_sel::<Vec<BackedCandidate<<T as frame_system::Config>::Hash>>, _>(
840				rng,
841				&chained_candidates,
842				preferred_chain_indices,
843				|candidates| backed_candidates_weight::<T>(&candidates),
844				max_consumable_by_candidates,
845			);
846		log::debug!(target: LOG_TARGET, "Indices Candidates: {:?}, size: {}", chained_indices, candidates.len());
847		chained_candidates
848			.indexed_retain(|idx, _backed_candidates| chained_indices.binary_search(&idx).is_ok());
849		// pick all bitfields, and
850		// fill the remaining space with candidates
851		let total_consumed = acc_candidate_weight.saturating_add(total_bitfields_weight);
852
853		*candidates = chained_candidates.into_iter().flatten().collect::<Vec<_>>();
854
855		return total_consumed
856	}
857
858	candidates.clear();
859
860	// insufficient space for even the bitfields alone, so only try to fit as many of those
861	// into the block and skip the candidates entirely
862	let (total_consumed, indices) = random_sel::<UncheckedSignedAvailabilityBitfield, _>(
863		rng,
864		&bitfields,
865		vec![],
866		|bitfield| signed_bitfield_weight::<T>(&bitfield),
867		max_consumable_weight,
868	);
869	log::debug!(target: LOG_TARGET, "Indices Bitfields: {:?}, size: {}", indices, bitfields.len());
870
871	bitfields.indexed_retain(|idx, _bitfield| indices.binary_search(&idx).is_ok());
872
873	total_consumed
874}
875
876/// Filter bitfields based on freed core indices, validity, and other sanity checks.
877///
878/// Do sanity checks on the bitfields:
879///
880///  1. no more than one bitfield per validator
881///  2. bitfields are ascending by validator index.
882///  3. each bitfield has exactly `expected_bits`
883///  4. signature is valid
884///  5. remove any disputed core indices
885///
886/// If any of those is not passed, the bitfield is dropped.
887pub(crate) fn sanitize_bitfields<T: crate::inclusion::Config>(
888	unchecked_bitfields: UncheckedSignedAvailabilityBitfields,
889	disputed_bitfield: DisputedBitfield,
890	expected_bits: usize,
891	parent_hash: T::Hash,
892	session_index: SessionIndex,
893	validators: &[ValidatorId],
894) -> SignedAvailabilityBitfields {
895	let mut bitfields = Vec::with_capacity(unchecked_bitfields.len());
896
897	let mut last_index: Option<ValidatorIndex> = None;
898
899	if disputed_bitfield.0.len() != expected_bits {
900		// This is a system logic error that should never occur, but we want to handle it gracefully
901		// so we just drop all bitfields
902		log::error!(target: LOG_TARGET, "BUG: disputed_bitfield != expected_bits");
903		return vec![]
904	}
905
906	let all_zeros = BitVec::<u8, bitvec::order::Lsb0>::repeat(false, expected_bits);
907	let signing_context = SigningContext { parent_hash, session_index };
908	for unchecked_bitfield in unchecked_bitfields {
909		// Find and skip invalid bitfields.
910		if unchecked_bitfield.unchecked_payload().0.len() != expected_bits {
911			log::trace!(
912				target: LOG_TARGET,
913				"bad bitfield length: {} != {:?}",
914				unchecked_bitfield.unchecked_payload().0.len(),
915				expected_bits,
916			);
917			continue
918		}
919
920		if unchecked_bitfield.unchecked_payload().0.clone() & disputed_bitfield.0.clone() !=
921			all_zeros
922		{
923			log::trace!(
924				target: LOG_TARGET,
925				"bitfield contains disputed cores: {:?}",
926				unchecked_bitfield.unchecked_payload().0.clone() & disputed_bitfield.0.clone()
927			);
928			continue
929		}
930
931		let validator_index = unchecked_bitfield.unchecked_validator_index();
932
933		if !last_index.map_or(true, |last_index: ValidatorIndex| last_index < validator_index) {
934			log::trace!(
935				target: LOG_TARGET,
936				"bitfield validator index is not greater than last: !({:?} < {})",
937				last_index.as_ref().map(|x| x.0),
938				validator_index.0
939			);
940			continue
941		}
942
943		if unchecked_bitfield.unchecked_validator_index().0 as usize >= validators.len() {
944			log::trace!(
945				target: LOG_TARGET,
946				"bitfield validator index is out of bounds: {} >= {}",
947				validator_index.0,
948				validators.len(),
949			);
950			continue
951		}
952
953		let validator_public = &validators[validator_index.0 as usize];
954
955		// Validate bitfield signature.
956		if let Ok(signed_bitfield) =
957			unchecked_bitfield.try_into_checked(&signing_context, validator_public)
958		{
959			bitfields.push(signed_bitfield);
960			METRICS.on_valid_bitfield_signature();
961		} else {
962			log::warn!(target: LOG_TARGET, "Invalid bitfield signature");
963			METRICS.on_invalid_bitfield_signature();
964		};
965
966		last_index = Some(validator_index);
967	}
968	bitfields
969}
970
971/// Perform required checks for given candidate receipt.
972///
973/// Returns `true` if candidate descriptor is version 1.
974///
975/// Otherwise returns `false` if:
976/// - version 2 descriptors are not allowed
977/// - the core index in descriptor doesn't match the one computed from the commitments
978/// - the `SelectCore` signal does not refer to a core at the top of claim queue
979fn sanitize_backed_candidate_v2<T: crate::inclusion::Config>(
980	candidate: &BackedCandidate<T::Hash>,
981	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
982	allow_v2_receipts: bool,
983) -> bool {
984	let descriptor_version = candidate.descriptor().version();
985
986	if descriptor_version == CandidateDescriptorVersion::Unknown {
987		log::debug!(
988			target: LOG_TARGET,
989			"Candidate with unknown descriptor version. Dropping candidate {:?} for paraid {:?}.",
990			candidate.candidate().hash(),
991			candidate.descriptor().para_id()
992		);
993		return false
994	}
995
996	// It is mandatory to filter these before calling `filter_unchained_candidates` to ensure
997	// any we drop any descendants of the dropped v2 candidates.
998	if descriptor_version == CandidateDescriptorVersion::V2 && !allow_v2_receipts {
999		log::debug!(
1000			target: LOG_TARGET,
1001			"V2 candidate descriptors not allowed. Dropping candidate {:?} for paraid {:?}.",
1002			candidate.candidate().hash(),
1003			candidate.descriptor().para_id()
1004		);
1005		return false
1006	}
1007
1008	// Get the claim queue snapshot at the candidate relay parent.
1009	let Some((rp_info, _)) =
1010		allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent(), None)
1011	else {
1012		log::debug!(
1013			target: LOG_TARGET,
1014			"Relay parent {:?} for candidate {:?} is not in the allowed relay parents.",
1015			candidate.descriptor().relay_parent(),
1016			candidate.candidate().hash(),
1017		);
1018		return false
1019	};
1020
1021	if let Err(err) = candidate.candidate().parse_ump_signals(&rp_info.claim_queue) {
1022		log::debug!(
1023			target: LOG_TARGET,
1024			"UMP signal check failed: {:?}. Dropping candidate {:?} for paraid {:?}.",
1025			err,
1026			candidate.candidate().hash(),
1027			candidate.descriptor().para_id()
1028		);
1029		return false
1030	}
1031
1032	if descriptor_version == CandidateDescriptorVersion::V1 {
1033		// Nothing more to check for v1 descriptors.
1034		return true
1035	}
1036
1037	let Some(session_index) = candidate.descriptor().session_index() else {
1038		log::debug!(
1039			target: LOG_TARGET,
1040			"Invalid V2 candidate receipt {:?} for paraid {:?}, missing session index.",
1041			candidate.candidate().hash(),
1042			candidate.descriptor().para_id(),
1043		);
1044		return false
1045	};
1046
1047	// Check if session index is equal to current session index.
1048	if session_index != shared::CurrentSessionIndex::<T>::get() {
1049		log::debug!(
1050			target: LOG_TARGET,
1051			"Dropping V2 candidate receipt {:?} for paraid {:?}, invalid session index {}, current session {}",
1052			candidate.candidate().hash(),
1053			candidate.descriptor().para_id(),
1054			session_index,
1055			shared::CurrentSessionIndex::<T>::get()
1056		);
1057		return false
1058	}
1059
1060	true
1061}
1062
1063/// Performs various filtering on the backed candidates inherent data.
1064/// Must maintain the invariant that the returned candidate collection contains the candidates
1065/// sorted in dependency order for each para. When doing any filtering, we must therefore drop any
1066/// subsequent candidates after the filtered one.
1067///
1068/// Filter out:
1069/// 1. Candidates that have v2 descriptors if the node `CandidateReceiptV2` feature is not enabled.
1070/// 2. any candidates which don't form a chain with the other candidates of the paraid (even if they
1071///    do form a chain but are not in the right order).
1072/// 3. any candidates that have a concluded invalid dispute or who are descendants of a concluded
1073///    invalid candidate.
1074/// 4. any unscheduled candidates, as well as candidates whose paraid has multiple cores assigned
1075///    but have no core index (either injected or in the v2 descriptor).
1076/// 5. all backing votes from disabled validators
1077/// 6. any candidates that end up with less than `effective_minimum_backing_votes` backing votes
1078///
1079/// Returns the scheduled
1080/// backed candidates which passed filtering, mapped by para id and in the right dependency order.
1081fn sanitize_backed_candidates<T: crate::inclusion::Config>(
1082	backed_candidates: Vec<BackedCandidate<T::Hash>>,
1083	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1084	concluded_invalid_with_descendants: BTreeSet<CandidateHash>,
1085	scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
1086	allow_v2_receipts: bool,
1087) -> BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>> {
1088	// Map the candidates to the right paraids, while making sure that the order between candidates
1089	// of the same para is preserved.
1090	let mut candidates_per_para: BTreeMap<ParaId, Vec<_>> = BTreeMap::new();
1091
1092	for candidate in backed_candidates {
1093		if !sanitize_backed_candidate_v2::<T>(&candidate, allowed_relay_parents, allow_v2_receipts)
1094		{
1095			continue
1096		}
1097
1098		candidates_per_para
1099			.entry(candidate.descriptor().para_id())
1100			.or_default()
1101			.push(candidate);
1102	}
1103
1104	// Check that candidates pertaining to the same para form a chain. Drop the ones that
1105	// don't, along with the rest of candidates which follow them in the input vector.
1106	filter_unchained_candidates::<T>(&mut candidates_per_para, allowed_relay_parents);
1107
1108	// Remove any candidates that were concluded invalid or who are descendants of concluded invalid
1109	// candidates (along with their descendants).
1110	retain_candidates::<T, _, _>(&mut candidates_per_para, |_, candidate| {
1111		let keep = !concluded_invalid_with_descendants.contains(&candidate.candidate().hash());
1112
1113		if !keep {
1114			log::debug!(
1115				target: LOG_TARGET,
1116				"Found backed candidate {:?} which was concluded invalid or is a descendant of a concluded invalid candidate, for paraid {:?}.",
1117				candidate.candidate().hash(),
1118				candidate.descriptor().para_id()
1119			);
1120		}
1121		keep
1122	});
1123
1124	// Map candidates to scheduled cores. Filter out any unscheduled candidates along with their
1125	// descendants.
1126	let mut backed_candidates_with_core =
1127		map_candidates_to_cores::<T>(&allowed_relay_parents, scheduled, candidates_per_para);
1128
1129	// Filter out backing statements from disabled validators. If by that we render a candidate with
1130	// less backing votes than required, filter that candidate also. As all the other filtering
1131	// operations above, we drop the descendants of the dropped candidates also.
1132	filter_backed_statements_from_disabled_validators::<T>(
1133		&mut backed_candidates_with_core,
1134		&allowed_relay_parents,
1135	);
1136
1137	backed_candidates_with_core
1138}
1139
1140fn count_backed_candidates<B>(backed_candidates: &BTreeMap<ParaId, Vec<B>>) -> usize {
1141	backed_candidates.values().map(|c| c.len()).sum()
1142}
1143
1144/// Derive entropy from babe provided per block randomness.
1145///
1146/// In the odd case none is available, uses the `parent_hash` and
1147/// a const value, while emitting a warning.
1148fn compute_entropy<T: Config>(parent_hash: T::Hash) -> [u8; 32] {
1149	const CANDIDATE_SEED_SUBJECT: [u8; 32] = *b"candidate-seed-selection-subject";
1150	// NOTE: this is slightly gameable since this randomness was already public
1151	// by the previous block, while for the block author this randomness was
1152	// known 2 epochs ago. it is marginally better than using the parent block
1153	// hash since it's harder to influence the VRF output than the block hash.
1154	let vrf_random = ParentBlockRandomness::<T>::random(&CANDIDATE_SEED_SUBJECT[..]).0;
1155	let mut entropy: [u8; 32] = CANDIDATE_SEED_SUBJECT;
1156	if let Some(vrf_random) = vrf_random {
1157		entropy.as_mut().copy_from_slice(vrf_random.as_ref());
1158	} else {
1159		// in case there is no VRF randomness present, we utilize the relay parent
1160		// as seed, it's better than a static value.
1161		log::warn!(target: LOG_TARGET, "ParentBlockRandomness did not provide entropy");
1162		entropy.as_mut().copy_from_slice(parent_hash.as_ref());
1163	}
1164	entropy
1165}
1166
1167/// Limit disputes in place.
1168///
1169/// Assumes ordering of disputes, retains sorting of the statement.
1170///
1171/// Prime source of overload safety for dispute votes:
1172/// 1. Check accumulated weight does not exceed the maximum block weight.
1173/// 2. If exceeded:
1174///   1. Check validity of all dispute statements sequentially
1175/// 2. If not exceeded:
1176///   1. If weight is exceeded by locals, pick the older ones (lower indices) until the weight limit
1177///      is reached.
1178///
1179/// Returns the consumed weight amount, that is guaranteed to be less than the provided
1180/// `max_consumable_weight`.
1181fn limit_and_sanitize_disputes<
1182	T: Config,
1183	CheckValidityFn: FnMut(DisputeStatementSet) -> Option<CheckedDisputeStatementSet>,
1184>(
1185	disputes: MultiDisputeStatementSet,
1186	mut dispute_statement_set_valid: CheckValidityFn,
1187	max_consumable_weight: Weight,
1188) -> (Vec<CheckedDisputeStatementSet>, Weight) {
1189	// The total weight if all disputes would be included
1190	let disputes_weight = multi_dispute_statement_sets_weight::<T>(&disputes);
1191
1192	if disputes_weight.any_gt(max_consumable_weight) {
1193		log::debug!(target: LOG_TARGET, "Above max consumable weight: {}/{}", disputes_weight, max_consumable_weight);
1194		let mut checked_acc = Vec::<CheckedDisputeStatementSet>::with_capacity(disputes.len());
1195
1196		// Accumulated weight of all disputes picked, that passed the checks.
1197		let mut weight_acc = Weight::zero();
1198
1199		// Select disputes in-order until the remaining weight is attained
1200		disputes.into_iter().for_each(|dss| {
1201			let dispute_weight = dispute_statement_set_weight::<T, &DisputeStatementSet>(&dss);
1202			let updated = weight_acc.saturating_add(dispute_weight);
1203			if max_consumable_weight.all_gte(updated) {
1204				// Always apply the weight. Invalid data cost processing time too:
1205				weight_acc = updated;
1206				if let Some(checked) = dispute_statement_set_valid(dss) {
1207					checked_acc.push(checked);
1208				}
1209			}
1210		});
1211
1212		(checked_acc, weight_acc)
1213	} else {
1214		// Go through all of them, and just apply the filter, they would all fit
1215		let checked = disputes
1216			.into_iter()
1217			.filter_map(|dss| dispute_statement_set_valid(dss))
1218			.collect::<Vec<CheckedDisputeStatementSet>>();
1219		// some might have been filtered out, so re-calc the weight
1220		let checked_disputes_weight = checked_multi_dispute_statement_sets_weight::<T>(&checked);
1221		(checked, checked_disputes_weight)
1222	}
1223}
1224
1225// Helper function for filtering candidates which don't pass the given predicate. When/if the first
1226// candidate which failed the predicate is found, all the other candidates that follow are dropped.
1227fn retain_candidates<
1228	T: inclusion::Config + paras::Config + inclusion::Config,
1229	F: FnMut(ParaId, &mut C) -> bool,
1230	C,
1231>(
1232	candidates_per_para: &mut BTreeMap<ParaId, Vec<C>>,
1233	mut pred: F,
1234) {
1235	for (para_id, candidates) in candidates_per_para.iter_mut() {
1236		let mut latest_valid_idx = None;
1237
1238		for (idx, candidate) in candidates.iter_mut().enumerate() {
1239			if pred(*para_id, candidate) {
1240				// Found a valid candidate.
1241				latest_valid_idx = Some(idx);
1242			} else {
1243				break
1244			}
1245		}
1246
1247		if let Some(latest_valid_idx) = latest_valid_idx {
1248			candidates.truncate(latest_valid_idx + 1);
1249		} else {
1250			candidates.clear();
1251		}
1252	}
1253
1254	candidates_per_para.retain(|_, c| !c.is_empty());
1255}
1256
1257// Filters statements from disabled validators in `BackedCandidate` and does a few more sanity
1258// checks.
1259fn filter_backed_statements_from_disabled_validators<
1260	T: shared::Config + scheduler::Config + inclusion::Config,
1261>(
1262	backed_candidates_with_core: &mut BTreeMap<
1263		ParaId,
1264		Vec<(BackedCandidate<<T as frame_system::Config>::Hash>, CoreIndex)>,
1265	>,
1266	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1267) {
1268	let disabled_validators =
1269		BTreeSet::<_>::from_iter(shared::Pallet::<T>::disabled_validators().into_iter());
1270
1271	if disabled_validators.is_empty() {
1272		// No disabled validators - nothing to do
1273		return
1274	}
1275
1276	let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
1277
1278	// Process all backed candidates. `validator_indices` in `BackedCandidates` are indices within
1279	// the validator group assigned to the parachain. To obtain this group we need:
1280	// 1. Core index assigned to the parachain which has produced the candidate
1281	// 2. The relay chain block number of the candidate
1282	retain_candidates::<T, _, _>(backed_candidates_with_core, |para_id, (bc, core_idx)| {
1283		// `CoreIndex` not used, we just need a copy to write it back later.
1284		let (validator_indices, maybe_injected_core_index) = bc.validator_indices_and_core_index();
1285		let mut validator_indices = BitVec::<_>::from(validator_indices);
1286
1287		// Get relay parent block number of the candidate. We need this to get the group index
1288		// assigned to this core at this block number
1289		let relay_parent_block_number = match allowed_relay_parents
1290			.acquire_info(bc.descriptor().relay_parent(), None)
1291		{
1292			Some((_, block_num)) => block_num,
1293			None => {
1294				log::debug!(
1295					target: LOG_TARGET,
1296					"Relay parent {:?} for candidate is not in the allowed relay parents. Dropping the candidate.",
1297					bc.descriptor().relay_parent()
1298				);
1299				return false
1300			},
1301		};
1302
1303		// Get the group index for the core
1304		let group_idx = match scheduler::Pallet::<T>::group_assigned_to_core(
1305			*core_idx,
1306			relay_parent_block_number + One::one(),
1307		) {
1308			Some(group_idx) => group_idx,
1309			None => {
1310				log::debug!(target: LOG_TARGET, "Can't get the group index for core idx {:?}. Dropping the candidate.", core_idx);
1311				return false
1312			},
1313		};
1314
1315		// And finally get the validator group for this group index
1316		let validator_group = match scheduler::Pallet::<T>::group_validators(group_idx) {
1317			Some(validator_group) => validator_group,
1318			None => {
1319				log::debug!(target: LOG_TARGET, "Can't get the validators from group {:?}. Dropping the candidate.", group_idx);
1320				return false
1321			},
1322		};
1323
1324		// Bitmask with the disabled indices within the validator group
1325		let disabled_indices = BitVec::<u8, bitvec::order::Lsb0>::from_iter(
1326			validator_group.iter().map(|idx| disabled_validators.contains(idx)),
1327		);
1328		// The indices of statements from disabled validators in `BackedCandidate`. We have to drop
1329		// these.
1330		let indices_to_drop = disabled_indices.clone() & &validator_indices;
1331
1332		// Remove the corresponding votes from `validity_votes`
1333		for idx in indices_to_drop.iter_ones().rev() {
1334			// Map the index in `indices_to_drop` (which is an index into the validator group)
1335			// to the index in the validity votes vector, which might have less number of votes,
1336			// than validators assigned to the group.
1337			//
1338			// For each index `idx` in `indices_to_drop`, the corresponding index in the
1339			// validity votes vector is the number of `1` bits in `validator_indices` before `idx`.
1340			let mapped_idx = validator_indices[..idx].count_ones();
1341			bc.validity_votes_mut().remove(mapped_idx);
1342		}
1343
1344		// Apply the bitmask to drop the disabled validator from `validator_indices`
1345		validator_indices &= !disabled_indices;
1346		// Update the backed candidate
1347		bc.set_validator_indices_and_core_index(validator_indices, maybe_injected_core_index);
1348
1349		// By filtering votes we might render the candidate invalid and cause a failure in
1350		// [`process_candidates`]. To avoid this we have to perform a sanity check here. If there
1351		// are not enough backing votes after filtering we will remove the whole candidate.
1352		if bc.validity_votes().len() <
1353			effective_minimum_backing_votes(validator_group.len(), minimum_backing_votes)
1354		{
1355			log::debug!(
1356				target: LOG_TARGET,
1357				"Dropping candidate {:?} of paraid {:?} because it was left with too few backing votes after votes from disabled validators were filtered.",
1358				bc.candidate().hash(),
1359				para_id
1360			);
1361
1362			return false
1363		}
1364
1365		true
1366	});
1367}
1368
1369// Check that candidates pertaining to the same para form a chain. Drop the ones that
1370// don't, along with the rest of candidates which follow them in the input vector.
1371// In the process, duplicated candidates will also be dropped (even if they form a valid cycle;
1372// cycles are not allowed if they entail backing duplicated candidates).
1373fn filter_unchained_candidates<T: inclusion::Config + paras::Config + inclusion::Config>(
1374	candidates: &mut BTreeMap<ParaId, Vec<BackedCandidate<T::Hash>>>,
1375	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1376) {
1377	let mut para_latest_context: BTreeMap<ParaId, (HeadData, BlockNumberFor<T>)> = BTreeMap::new();
1378	for para_id in candidates.keys() {
1379		let Some(latest_head_data) = inclusion::Pallet::<T>::para_latest_head_data(&para_id) else {
1380			defensive!("Latest included head data for paraid {:?} is None", para_id);
1381			continue
1382		};
1383		let Some(latest_relay_parent) = inclusion::Pallet::<T>::para_most_recent_context(&para_id)
1384		else {
1385			defensive!("Latest relay parent for paraid {:?} is None", para_id);
1386			continue
1387		};
1388		para_latest_context.insert(*para_id, (latest_head_data, latest_relay_parent));
1389	}
1390
1391	let mut para_visited_candidates: BTreeMap<ParaId, BTreeSet<CandidateHash>> = BTreeMap::new();
1392
1393	retain_candidates::<T, _, _>(candidates, |para_id, candidate| {
1394		let Some((latest_head_data, latest_relay_parent)) = para_latest_context.get(&para_id)
1395		else {
1396			return false
1397		};
1398		let candidate_hash = candidate.candidate().hash();
1399
1400		let visited_candidates =
1401			para_visited_candidates.entry(para_id).or_insert_with(|| BTreeSet::new());
1402		if visited_candidates.contains(&candidate_hash) {
1403			log::debug!(
1404				target: LOG_TARGET,
1405				"Found duplicate candidates for paraid {:?}. Dropping the candidates with hash {:?}",
1406				para_id,
1407				candidate_hash
1408			);
1409
1410			// If we got a duplicate candidate, stop.
1411			return false
1412		} else {
1413			visited_candidates.insert(candidate_hash);
1414		}
1415
1416		let check_ctx = CandidateCheckContext::<T>::new(Some(*latest_relay_parent));
1417
1418		match check_ctx.verify_backed_candidate(
1419			&allowed_relay_parents,
1420			candidate.candidate(),
1421			latest_head_data.clone(),
1422		) {
1423			Ok(relay_parent_block_number) => {
1424				para_latest_context.insert(
1425					para_id,
1426					(
1427						candidate.candidate().commitments.head_data.clone(),
1428						relay_parent_block_number,
1429					),
1430				);
1431				true
1432			},
1433			Err(err) => {
1434				log::debug!(
1435					target: LOG_TARGET,
1436					"Backed candidate verification for candidate {:?} of paraid {:?} failed with {:?}",
1437					candidate_hash,
1438					para_id,
1439					err
1440				);
1441				false
1442			},
1443		}
1444	});
1445}
1446
1447/// Map candidates to scheduled cores.
1448/// If the para only has one scheduled core and one candidate supplied, map the candidate to the
1449/// single core. If the para has multiple cores scheduled, only map the candidates with core index.
1450/// Filter out the rest.
1451/// Also returns whether or not we dropped any candidates.
1452/// When dropping a candidate of a para, we must drop all subsequent candidates from that para
1453/// (because they form a chain).
1454fn map_candidates_to_cores<T: configuration::Config + scheduler::Config + inclusion::Config>(
1455	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1456	mut scheduled: BTreeMap<ParaId, BTreeSet<CoreIndex>>,
1457	candidates: BTreeMap<ParaId, Vec<BackedCandidate<T::Hash>>>,
1458) -> BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>> {
1459	let mut backed_candidates_with_core = BTreeMap::new();
1460
1461	for (para_id, backed_candidates) in candidates.into_iter() {
1462		if backed_candidates.len() == 0 {
1463			defensive!("Backed candidates for paraid {} is empty.", para_id);
1464			continue
1465		}
1466
1467		let Some(scheduled_cores) = scheduled.get_mut(&para_id) else {
1468			log::debug!(
1469				target: LOG_TARGET,
1470				"Paraid: {:?} has no entry in scheduled cores but {} candidates were supplied.",
1471				para_id,
1472				backed_candidates.len()
1473			);
1474			continue
1475		};
1476
1477		// ParaIds without scheduled cores are silently filtered out.
1478		if scheduled_cores.len() == 0 {
1479			log::debug!(
1480				target: LOG_TARGET,
1481				"Paraid: {:?} has no scheduled cores but {} candidates were supplied.",
1482				para_id,
1483				backed_candidates.len()
1484			);
1485			continue
1486		}
1487
1488		// We must preserve the dependency order given in the input.
1489		let mut temp_backed_candidates = Vec::with_capacity(scheduled_cores.len());
1490
1491		for candidate in backed_candidates {
1492			if scheduled_cores.len() == 0 {
1493				// We've got candidates for all of this para's assigned cores. Move on to
1494				// the next para.
1495				log::debug!(
1496					target: LOG_TARGET,
1497					"Found enough candidates for paraid: {:?}.",
1498					candidate.descriptor().para_id()
1499				);
1500				break;
1501			}
1502
1503			if let Some(core_index) = get_core_index::<T>(allowed_relay_parents, &candidate) {
1504				if scheduled_cores.remove(&core_index) {
1505					temp_backed_candidates.push((candidate, core_index));
1506				} else {
1507					// if we got a candidate for a core index which is not scheduled, stop
1508					// the work for this para. the already processed candidate chain in
1509					// temp_backed_candidates is still fine though.
1510					log::debug!(
1511						target: LOG_TARGET,
1512						"Found a backed candidate {:?} with core index {}, which is not scheduled for paraid {:?}.",
1513						candidate.candidate().hash(),
1514						core_index.0,
1515						candidate.descriptor().para_id()
1516					);
1517
1518					break;
1519				}
1520			} else {
1521				// if we got a candidate which does not contain its core index, stop the
1522				// work for this para. the already processed candidate chain in
1523				// temp_backed_candidates is still fine though.
1524
1525				log::debug!(
1526					target: LOG_TARGET,
1527					"Found a backed candidate {:?} without core index information for para {:?}, dropping",
1528					candidate.candidate().hash(),
1529					candidate.descriptor().para_id()
1530				);
1531
1532				break;
1533			}
1534		}
1535
1536		if !temp_backed_candidates.is_empty() {
1537			backed_candidates_with_core
1538				.entry(para_id)
1539				.or_insert_with(|| vec![])
1540				.extend(temp_backed_candidates);
1541		}
1542	}
1543
1544	backed_candidates_with_core
1545}
1546
1547// Must be called only for candidates that have been sanitized already.
1548fn get_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
1549	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1550	candidate: &BackedCandidate<T::Hash>,
1551) -> Option<CoreIndex> {
1552	candidate
1553		.candidate()
1554		.descriptor
1555		.core_index()
1556		.or_else(|| get_injected_core_index::<T>(allowed_relay_parents, &candidate))
1557}
1558
1559fn get_injected_core_index<T: configuration::Config + scheduler::Config + inclusion::Config>(
1560	allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1561	candidate: &BackedCandidate<T::Hash>,
1562) -> Option<CoreIndex> {
1563	// After stripping the 8 bit extensions, the `validator_indices` field length is expected
1564	// to be equal to backing group size. If these don't match, the `CoreIndex` is badly encoded,
1565	// or not supported.
1566	let (validator_indices, Some(core_idx)) = candidate.validator_indices_and_core_index() else {
1567		return None
1568	};
1569
1570	let relay_parent_block_number =
1571		match allowed_relay_parents.acquire_info(candidate.descriptor().relay_parent(), None) {
1572			Some((_, block_num)) => block_num,
1573			None => {
1574				log::debug!(
1575					target: LOG_TARGET,
1576					"Relay parent {:?} for candidate {:?} is not in the allowed relay parents.",
1577					candidate.descriptor().relay_parent(),
1578					candidate.candidate().hash(),
1579				);
1580				return None
1581			},
1582		};
1583
1584	// Get the backing group of the candidate backed at `core_idx`.
1585	let group_idx = match scheduler::Pallet::<T>::group_assigned_to_core(
1586		core_idx,
1587		relay_parent_block_number + One::one(),
1588	) {
1589		Some(group_idx) => group_idx,
1590		None => {
1591			log::debug!(
1592				target: LOG_TARGET,
1593				"Can't get the group index for core idx {:?}.",
1594				core_idx,
1595			);
1596			return None
1597		},
1598	};
1599
1600	let group_validators = match scheduler::Pallet::<T>::group_validators(group_idx) {
1601		Some(validators) => validators,
1602		None => return None,
1603	};
1604
1605	if group_validators.len() == validator_indices.len() {
1606		Some(core_idx)
1607	} else {
1608		log::debug!(
1609			target: LOG_TARGET,
1610			"Expected validator_indices count different than the real one: {}, {} for candidate {:?}",
1611			group_validators.len(),
1612			validator_indices.len(),
1613			candidate.candidate().hash()
1614		);
1615
1616		None
1617	}
1618}