polkadot_runtime_parachains/inclusion/
mod.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! The inclusion pallet is responsible for inclusion and availability of scheduled parachains.
18//!
19//! It is responsible for carrying candidates from being backable to being backed, and then from
20//! backed to included.
21
22use crate::{
23	configuration::{self, HostConfiguration},
24	disputes, dmp, hrmp,
25	paras::{self, UpgradeStrategy},
26	scheduler,
27	shared::{self, AllowedRelayParentsTracker},
28	util::make_persisted_validation_data_with_parent,
29};
30use alloc::{
31	collections::{btree_map::BTreeMap, btree_set::BTreeSet, vec_deque::VecDeque},
32	vec,
33	vec::Vec,
34};
35use bitvec::{order::Lsb0 as BitOrderLsb0, vec::BitVec};
36use codec::{Decode, DecodeWithMemTracking, Encode};
37use core::fmt;
38use frame_support::{
39	defensive,
40	pallet_prelude::*,
41	traits::{EnqueueMessage, Footprint, QueueFootprint, QueueFootprintQuery},
42	BoundedSlice,
43};
44use frame_system::pallet_prelude::*;
45use pallet_message_queue::OnQueueChanged;
46use polkadot_primitives::{
47	effective_minimum_backing_votes, supermajority_threshold,
48	vstaging::{
49		skip_ump_signals, BackedCandidate, CandidateDescriptorV2 as CandidateDescriptor,
50		CandidateReceiptV2 as CandidateReceipt,
51		CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
52	},
53	well_known_keys, CandidateCommitments, CandidateHash, CoreIndex, GroupIndex, HeadData,
54	Id as ParaId, SignedAvailabilityBitfields, SigningContext, UpwardMessage, ValidatorId,
55	ValidatorIndex, ValidityAttestation,
56};
57use scale_info::TypeInfo;
58use sp_runtime::{traits::One, DispatchError, SaturatedConversion, Saturating};
59
60pub use pallet::*;
61
62#[cfg(test)]
63pub(crate) mod tests;
64
65#[cfg(feature = "runtime-benchmarks")]
66mod benchmarking;
67
68pub mod migration;
69
70pub trait WeightInfo {
71	/// Weight for `enact_candidate` extrinsic given the number of sent messages
72	/// (ump, hrmp) and whether there is a new code for a runtime upgrade.
73	///
74	/// NOTE: due to a shortcoming of the current benchmarking framework,
75	/// we use `u32` for the code upgrade, even though it is a `bool`.
76	fn enact_candidate(u: u32, h: u32, c: u32) -> Weight;
77}
78
79pub struct TestWeightInfo;
80impl WeightInfo for TestWeightInfo {
81	fn enact_candidate(_u: u32, _h: u32, _c: u32) -> Weight {
82		Weight::zero()
83	}
84}
85
86impl WeightInfo for () {
87	fn enact_candidate(_u: u32, _h: u32, _c: u32) -> Weight {
88		Weight::zero()
89	}
90}
91
92/// Maximum value that `config.max_upward_message_size` can be set to.
93///
94/// This is used for benchmarking sanely bounding relevant storage items. It is expected from the
95/// `configuration` pallet to check these values before setting.
96pub const MAX_UPWARD_MESSAGE_SIZE_BOUND: u32 = 128 * 1024;
97
98/// A backed candidate pending availability.
99#[derive(Encode, Decode, PartialEq, TypeInfo, Clone)]
100#[cfg_attr(test, derive(Debug))]
101pub struct CandidatePendingAvailability<H, N> {
102	/// The availability core this is assigned to.
103	core: CoreIndex,
104	/// The candidate hash.
105	hash: CandidateHash,
106	/// The candidate descriptor.
107	descriptor: CandidateDescriptor<H>,
108	/// The candidate commitments.
109	commitments: CandidateCommitments,
110	/// The received availability votes. One bit per validator.
111	availability_votes: BitVec<u8, BitOrderLsb0>,
112	/// The backers of the candidate pending availability.
113	backers: BitVec<u8, BitOrderLsb0>,
114	/// The block number of the relay-parent of the receipt.
115	relay_parent_number: N,
116	/// The block number of the relay-chain block this was backed in.
117	backed_in_number: N,
118	/// The group index backing this block.
119	backing_group: GroupIndex,
120}
121
122impl<H, N> CandidatePendingAvailability<H, N> {
123	/// Get the availability votes on the candidate.
124	pub(crate) fn availability_votes(&self) -> &BitVec<u8, BitOrderLsb0> {
125		&self.availability_votes
126	}
127
128	/// Get the relay-chain block number this was backed in.
129	pub(crate) fn backed_in_number(&self) -> N
130	where
131		N: Clone,
132	{
133		self.backed_in_number.clone()
134	}
135
136	/// Get the core index.
137	pub(crate) fn core_occupied(&self) -> CoreIndex {
138		self.core
139	}
140
141	/// Get the candidate hash.
142	pub(crate) fn candidate_hash(&self) -> CandidateHash {
143		self.hash
144	}
145
146	/// Get the candidate descriptor.
147	pub(crate) fn candidate_descriptor(&self) -> &CandidateDescriptor<H> {
148		&self.descriptor
149	}
150
151	/// Get the candidate commitments.
152	pub(crate) fn candidate_commitments(&self) -> &CandidateCommitments {
153		&self.commitments
154	}
155
156	/// Get the candidate's relay parent's number.
157	pub(crate) fn relay_parent_number(&self) -> N
158	where
159		N: Clone,
160	{
161		self.relay_parent_number.clone()
162	}
163
164	#[cfg(any(feature = "runtime-benchmarks", test))]
165	pub(crate) fn new(
166		core: CoreIndex,
167		hash: CandidateHash,
168		descriptor: CandidateDescriptor<H>,
169		commitments: CandidateCommitments,
170		availability_votes: BitVec<u8, BitOrderLsb0>,
171		backers: BitVec<u8, BitOrderLsb0>,
172		relay_parent_number: N,
173		backed_in_number: N,
174		backing_group: GroupIndex,
175	) -> Self {
176		Self {
177			core,
178			hash,
179			descriptor,
180			commitments,
181			availability_votes,
182			backers,
183			relay_parent_number,
184			backed_in_number,
185			backing_group,
186		}
187	}
188}
189
190/// A hook for applying validator rewards
191pub trait RewardValidators {
192	// Reward the validators with the given indices for issuing backing statements.
193	fn reward_backing(validators: impl IntoIterator<Item = ValidatorIndex>);
194	// Reward the validators with the given indices for issuing availability bitfields.
195	// Validators are sent to this hook when they have contributed to the availability
196	// of a candidate by setting a bit in their bitfield.
197	fn reward_bitfields(validators: impl IntoIterator<Item = ValidatorIndex>);
198}
199
200impl RewardValidators for () {
201	fn reward_backing(_: impl IntoIterator<Item = ValidatorIndex>) {}
202	fn reward_bitfields(_: impl IntoIterator<Item = ValidatorIndex>) {}
203}
204
205/// Reads the footprint of queues for a specific origin type.
206pub trait QueueFootprinter {
207	type Origin;
208
209	fn message_count(origin: Self::Origin) -> u64;
210}
211
212impl QueueFootprinter for () {
213	type Origin = UmpQueueId;
214
215	fn message_count(_: Self::Origin) -> u64 {
216		0
217	}
218}
219
220/// Aggregate message origin for the `MessageQueue` pallet.
221///
222/// Can be extended to serve further use-cases besides just UMP. Is stored in storage, so any change
223/// to existing values will require a migration.
224#[derive(
225	Encode,
226	Decode,
227	DecodeWithMemTracking,
228	Clone,
229	MaxEncodedLen,
230	Eq,
231	PartialEq,
232	RuntimeDebug,
233	TypeInfo,
234)]
235pub enum AggregateMessageOrigin {
236	/// Inbound upward message.
237	#[codec(index = 0)]
238	Ump(UmpQueueId),
239}
240
241/// Identifies a UMP queue inside the `MessageQueue` pallet.
242///
243/// It is written in verbose form since future variants like `Here` and `Bridged` are already
244/// foreseeable.
245#[derive(
246	Encode,
247	Decode,
248	DecodeWithMemTracking,
249	Clone,
250	MaxEncodedLen,
251	Eq,
252	PartialEq,
253	RuntimeDebug,
254	TypeInfo,
255)]
256pub enum UmpQueueId {
257	/// The message originated from this parachain.
258	#[codec(index = 0)]
259	Para(ParaId),
260}
261
262#[cfg(feature = "runtime-benchmarks")]
263impl From<u32> for AggregateMessageOrigin {
264	fn from(n: u32) -> Self {
265		// Some dummy for the benchmarks.
266		Self::Ump(UmpQueueId::Para(n.into()))
267	}
268}
269
270/// The maximal length of a UMP message.
271pub type MaxUmpMessageLenOf<T> =
272	<<T as Config>::MessageQueue as EnqueueMessage<AggregateMessageOrigin>>::MaxMessageLen;
273
274#[frame_support::pallet]
275pub mod pallet {
276	use super::*;
277
278	const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
279	#[pallet::pallet]
280	#[pallet::without_storage_info]
281	#[pallet::storage_version(STORAGE_VERSION)]
282	pub struct Pallet<T>(_);
283
284	#[pallet::config]
285	pub trait Config:
286		frame_system::Config
287		+ shared::Config
288		+ paras::Config
289		+ dmp::Config
290		+ hrmp::Config
291		+ configuration::Config
292		+ scheduler::Config
293	{
294		#[allow(deprecated)]
295		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
296		type DisputesHandler: disputes::DisputesHandler<BlockNumberFor<Self>>;
297		type RewardValidators: RewardValidators;
298
299		/// The system message queue.
300		///
301		/// The message queue provides general queueing and processing functionality. Currently it
302		/// replaces the old `UMP` dispatch queue. Other use-cases can be implemented as well by
303		/// adding new variants to `AggregateMessageOrigin`.
304		type MessageQueue: EnqueueMessage<AggregateMessageOrigin>
305			+ QueueFootprintQuery<AggregateMessageOrigin, MaxMessageLen = MaxUmpMessageLenOf<Self>>;
306
307		/// Weight info for the calls of this pallet.
308		type WeightInfo: WeightInfo;
309	}
310
311	#[pallet::event]
312	#[pallet::generate_deposit(pub(super) fn deposit_event)]
313	pub enum Event<T: Config> {
314		/// A candidate was backed. `[candidate, head_data]`
315		CandidateBacked(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
316		/// A candidate was included. `[candidate, head_data]`
317		CandidateIncluded(CandidateReceipt<T::Hash>, HeadData, CoreIndex, GroupIndex),
318		/// A candidate timed out. `[candidate, head_data]`
319		CandidateTimedOut(CandidateReceipt<T::Hash>, HeadData, CoreIndex),
320		/// Some upward messages have been received and will be processed.
321		UpwardMessagesReceived { from: ParaId, count: u32 },
322	}
323
324	#[pallet::error]
325	pub enum Error<T> {
326		/// Validator index out of bounds.
327		ValidatorIndexOutOfBounds,
328		/// Candidate submitted but para not scheduled.
329		UnscheduledCandidate,
330		/// Head data exceeds the configured maximum.
331		HeadDataTooLarge,
332		/// Code upgrade prematurely.
333		PrematureCodeUpgrade,
334		/// Output code is too large
335		NewCodeTooLarge,
336		/// The candidate's relay-parent was not allowed. Either it was
337		/// not recent enough or it didn't advance based on the last parachain block.
338		DisallowedRelayParent,
339		/// Failed to compute group index for the core: either it's out of bounds
340		/// or the relay parent doesn't belong to the current session.
341		InvalidAssignment,
342		/// Invalid group index in core assignment.
343		InvalidGroupIndex,
344		/// Insufficient (non-majority) backing.
345		InsufficientBacking,
346		/// Invalid (bad signature, unknown validator, etc.) backing.
347		InvalidBacking,
348		/// The validation data hash does not match expected.
349		ValidationDataHashMismatch,
350		/// The downward message queue is not processed correctly.
351		IncorrectDownwardMessageHandling,
352		/// At least one upward message sent does not pass the acceptance criteria.
353		InvalidUpwardMessages,
354		/// The candidate didn't follow the rules of HRMP watermark advancement.
355		HrmpWatermarkMishandling,
356		/// The HRMP messages sent by the candidate is not valid.
357		InvalidOutboundHrmp,
358		/// The validation code hash of the candidate is not valid.
359		InvalidValidationCodeHash,
360		/// The `para_head` hash in the candidate descriptor doesn't match the hash of the actual
361		/// para head in the commitments.
362		ParaHeadMismatch,
363	}
364
365	/// Candidates pending availability by `ParaId`. They form a chain starting from the latest
366	/// included head of the para.
367	/// Use a different prefix post-migration to v1, since the v0 `PendingAvailability` storage
368	/// would otherwise have the exact same prefix which could cause undefined behaviour when doing
369	/// the migration.
370	#[pallet::storage]
371	#[pallet::storage_prefix = "V1"]
372	pub(crate) type PendingAvailability<T: Config> = StorageMap<
373		_,
374		Twox64Concat,
375		ParaId,
376		VecDeque<CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>>,
377	>;
378
379	#[pallet::call]
380	impl<T: Config> Pallet<T> {}
381}
382
383const LOG_TARGET: &str = "runtime::inclusion";
384
385/// The reason that a candidate's outputs were rejected for.
386#[derive(Debug)]
387enum AcceptanceCheckErr {
388	HeadDataTooLarge,
389	/// Code upgrades are not permitted at the current time.
390	PrematureCodeUpgrade,
391	/// The new runtime blob is too large.
392	NewCodeTooLarge,
393	/// The candidate violated this DMP acceptance criteria.
394	ProcessedDownwardMessages,
395	/// The candidate violated this UMP acceptance criteria.
396	UpwardMessages,
397	/// The candidate violated this HRMP watermark acceptance criteria.
398	HrmpWatermark,
399	/// The candidate violated this outbound HRMP acceptance criteria.
400	OutboundHrmp,
401}
402
403impl From<dmp::ProcessedDownwardMessagesAcceptanceErr> for AcceptanceCheckErr {
404	fn from(_: dmp::ProcessedDownwardMessagesAcceptanceErr) -> Self {
405		Self::ProcessedDownwardMessages
406	}
407}
408
409impl From<UmpAcceptanceCheckErr> for AcceptanceCheckErr {
410	fn from(_: UmpAcceptanceCheckErr) -> Self {
411		Self::UpwardMessages
412	}
413}
414
415impl<BlockNumber> From<hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>> for AcceptanceCheckErr {
416	fn from(_: hrmp::HrmpWatermarkAcceptanceErr<BlockNumber>) -> Self {
417		Self::HrmpWatermark
418	}
419}
420
421impl From<hrmp::OutboundHrmpAcceptanceErr> for AcceptanceCheckErr {
422	fn from(_: hrmp::OutboundHrmpAcceptanceErr) -> Self {
423		Self::OutboundHrmp
424	}
425}
426
427/// An error returned by [`Pallet::check_upward_messages`] that indicates a violation of one of
428/// acceptance criteria rules.
429#[cfg_attr(test, derive(PartialEq))]
430#[allow(dead_code)]
431pub(crate) enum UmpAcceptanceCheckErr {
432	/// The maximal number of messages that can be submitted in one batch was exceeded.
433	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
434	/// The maximal size of a single message was exceeded.
435	MessageSize { idx: u32, msg_size: u32, max_size: u32 },
436	/// The allowed number of messages in the queue was exceeded.
437	CapacityExceeded { count: u64, limit: u64 },
438	/// The allowed combined message size in the queue was exceeded.
439	TotalSizeExceeded { total_size: u64, limit: u64 },
440	/// A para-chain cannot send UMP messages while it is offboarding.
441	IsOffboarding,
442}
443
444impl fmt::Debug for UmpAcceptanceCheckErr {
445	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
446		match *self {
447			UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted } => write!(
448				fmt,
449				"more upward messages than permitted by config ({} > {})",
450				sent, permitted,
451			),
452			UmpAcceptanceCheckErr::MessageSize { idx, msg_size, max_size } => write!(
453				fmt,
454				"upward message idx {} larger than permitted by config ({} > {})",
455				idx, msg_size, max_size,
456			),
457			UmpAcceptanceCheckErr::CapacityExceeded { count, limit } => write!(
458				fmt,
459				"the ump queue would have more items than permitted by config ({} > {})",
460				count, limit,
461			),
462			UmpAcceptanceCheckErr::TotalSizeExceeded { total_size, limit } => write!(
463				fmt,
464				"the ump queue would have grown past the max size permitted by config ({} > {})",
465				total_size, limit,
466			),
467			UmpAcceptanceCheckErr::IsOffboarding => {
468				write!(fmt, "upward message rejected because the para is off-boarding")
469			},
470		}
471	}
472}
473
474impl<T: Config> Pallet<T> {
475	/// Block initialization logic, called by initializer.
476	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
477		Weight::zero()
478	}
479
480	/// Block finalization logic, called by initializer.
481	pub(crate) fn initializer_finalize() {}
482
483	/// Handle an incoming session change.
484	pub(crate) fn initializer_on_new_session(
485		_notification: &crate::initializer::SessionChangeNotification<BlockNumberFor<T>>,
486		outgoing_paras: &[ParaId],
487	) {
488		// unlike most drain methods, drained elements are not cleared on `Drop` of the iterator
489		// and require consumption.
490		for _ in PendingAvailability::<T>::drain() {}
491
492		Self::cleanup_outgoing_ump_dispatch_queues(outgoing_paras);
493	}
494
495	pub(crate) fn cleanup_outgoing_ump_dispatch_queues(outgoing: &[ParaId]) {
496		for outgoing_para in outgoing {
497			Self::cleanup_outgoing_ump_dispatch_queue(*outgoing_para);
498		}
499	}
500
501	pub(crate) fn cleanup_outgoing_ump_dispatch_queue(para: ParaId) {
502		T::MessageQueue::sweep_queue(AggregateMessageOrigin::Ump(UmpQueueId::Para(para)));
503	}
504
505	pub(crate) fn get_occupied_cores(
506	) -> impl Iterator<Item = (CoreIndex, CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>)>
507	{
508		PendingAvailability::<T>::iter_values().flat_map(|pending_candidates| {
509			pending_candidates.into_iter().map(|c| (c.core, c.clone()))
510		})
511	}
512
513	/// Extract the freed cores based on cores that became available.
514	///
515	/// Bitfields are expected to have been sanitized already. E.g. via `sanitize_bitfields`!
516	///
517	/// Updates storage items `PendingAvailability`.
518	///
519	/// Returns a `Vec` of `CandidateHash`es and their respective `AvailabilityCore`s that became
520	/// available, and cores free.
521	pub(crate) fn update_pending_availability_and_get_freed_cores(
522		validators: &[ValidatorId],
523		signed_bitfields: SignedAvailabilityBitfields,
524	) -> (Weight, Vec<(CoreIndex, CandidateHash)>) {
525		let threshold = availability_threshold(validators.len());
526
527		let mut votes_per_core: BTreeMap<CoreIndex, BTreeSet<ValidatorIndex>> = BTreeMap::new();
528
529		for (checked_bitfield, validator_index) in
530			signed_bitfields.into_iter().map(|signed_bitfield| {
531				let validator_idx = signed_bitfield.validator_index();
532				let checked_bitfield = signed_bitfield.into_payload();
533				(checked_bitfield, validator_idx)
534			}) {
535			for (bit_idx, _) in checked_bitfield.0.iter().enumerate().filter(|(_, is_av)| **is_av) {
536				let core_index = CoreIndex(bit_idx as u32);
537				votes_per_core
538					.entry(core_index)
539					.or_insert_with(|| BTreeSet::new())
540					.insert(validator_index);
541			}
542		}
543
544		let mut freed_cores = vec![];
545		let mut weight = Weight::zero();
546
547		let pending_paraids: Vec<_> = PendingAvailability::<T>::iter_keys().collect();
548		for paraid in pending_paraids {
549			PendingAvailability::<T>::mutate(paraid, |candidates| {
550				if let Some(candidates) = candidates {
551					let mut last_enacted_index: Option<usize> = None;
552
553					for (candidate_index, candidate) in candidates.iter_mut().enumerate() {
554						if let Some(validator_indices) = votes_per_core.remove(&candidate.core) {
555							for validator_index in validator_indices.iter() {
556								// defensive check - this is constructed by loading the
557								// availability bitfield record, which is always `Some` if
558								// the core is occupied - that's why we're here.
559								if let Some(mut bit) =
560									candidate.availability_votes.get_mut(validator_index.0 as usize)
561								{
562									*bit = true;
563								}
564							}
565						}
566
567						// We check for the candidate's availability even if we didn't get any new
568						// bitfields for its core, as it may have already been available at a
569						// previous block but wasn't enacted due to its predecessors not being
570						// available.
571						if candidate.availability_votes.count_ones() >= threshold {
572							// We can only enact a candidate if we've enacted all of its
573							// predecessors already.
574							let can_enact = if candidate_index == 0 {
575								last_enacted_index == None
576							} else {
577								let prev_candidate_index = usize::try_from(candidate_index - 1)
578									.expect("Previous `if` would have caught a 0 candidate index.");
579								matches!(last_enacted_index, Some(old_index) if old_index == prev_candidate_index)
580							};
581
582							if can_enact {
583								last_enacted_index = Some(candidate_index);
584							}
585						}
586					}
587
588					// Trim the pending availability candidates storage and enact candidates of this
589					// para now.
590					if let Some(last_enacted_index) = last_enacted_index {
591						let evicted_candidates = candidates.drain(0..=last_enacted_index);
592						for candidate in evicted_candidates {
593							freed_cores.push((candidate.core, candidate.hash));
594
595							let receipt = CommittedCandidateReceipt {
596								descriptor: candidate.descriptor,
597								commitments: candidate.commitments,
598							};
599
600							let has_runtime_upgrade =
601								receipt.commitments.new_validation_code.as_ref().map_or(0, |_| 1);
602							let u = receipt.commitments.upward_messages.len() as u32;
603							let h = receipt.commitments.horizontal_messages.len() as u32;
604							let enact_weight = <T as Config>::WeightInfo::enact_candidate(
605								u,
606								h,
607								has_runtime_upgrade,
608							);
609							Self::enact_candidate(
610								candidate.relay_parent_number,
611								receipt,
612								candidate.backers,
613								candidate.availability_votes,
614								candidate.core,
615								candidate.backing_group,
616							);
617							weight.saturating_accrue(enact_weight);
618						}
619					}
620				}
621			});
622		}
623		// For relay chain blocks, we're (ab)using the proof size
624		// to limit the raw transaction size of `ParaInherent` and
625		// there's no state proof (aka PoV) associated with it.
626		// Since we already accounted for bitfields size, we should
627		// not include `enact_candidate` PoV impact here.
628		(weight.set_proof_size(0), freed_cores)
629	}
630
631	/// Process candidates that have been backed. Provide a set of
632	/// candidates along with their scheduled cores.
633	///
634	/// Candidates of the same paraid should be sorted according to their dependency order (they
635	/// should form a chain). If this condition is not met, this function will return an error.
636	/// (This really should not happen here, if the candidates were properly sanitised in
637	/// paras_inherent).
638	pub(crate) fn process_candidates<GV>(
639		allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
640		candidates: &BTreeMap<ParaId, Vec<(BackedCandidate<T::Hash>, CoreIndex)>>,
641		group_validators: GV,
642	) -> Result<
643		Vec<(CandidateReceipt<T::Hash>, Vec<(ValidatorIndex, ValidityAttestation)>)>,
644		DispatchError,
645	>
646	where
647		GV: Fn(GroupIndex) -> Option<Vec<ValidatorIndex>>,
648	{
649		if candidates.is_empty() {
650			return Ok(Default::default())
651		}
652
653		let now = frame_system::Pallet::<T>::block_number();
654		let validators = shared::ActiveValidatorKeys::<T>::get();
655
656		// Collect candidate receipts with backers.
657		let mut candidate_receipt_with_backing_validator_indices =
658			Vec::with_capacity(candidates.len());
659
660		for (para_id, para_candidates) in candidates {
661			let mut latest_head_data = match Self::para_latest_head_data(para_id) {
662				None => {
663					defensive!("Latest included head data for paraid {:?} is None", para_id);
664					continue
665				},
666				Some(latest_head_data) => latest_head_data,
667			};
668
669			for (candidate, core) in para_candidates.iter() {
670				let candidate_hash = candidate.candidate().hash();
671
672				// The previous context is None, as it's already checked during candidate
673				// sanitization.
674				let check_ctx = CandidateCheckContext::<T>::new(None);
675				let relay_parent_number = check_ctx.verify_backed_candidate(
676					&allowed_relay_parents,
677					candidate.candidate(),
678					latest_head_data.clone(),
679				)?;
680
681				// The candidate based upon relay parent `N` should be backed by a
682				// group assigned to core at block `N + 1`. Thus,
683				// `relay_parent_number + 1` will always land in the current
684				// session.
685				let group_idx = scheduler::Pallet::<T>::group_assigned_to_core(
686					*core,
687					relay_parent_number + One::one(),
688				)
689				.ok_or_else(|| {
690					log::warn!(
691						target: LOG_TARGET,
692						"Failed to compute group index for candidate {:?}",
693						candidate_hash
694					);
695					Error::<T>::InvalidAssignment
696				})?;
697				let group_vals =
698					group_validators(group_idx).ok_or_else(|| Error::<T>::InvalidGroupIndex)?;
699
700				// Check backing vote count and validity.
701				let (backers, backer_idx_and_attestation) =
702					Self::check_backing_votes(candidate, &validators, group_vals)?;
703
704				// Found a valid candidate.
705				latest_head_data = candidate.candidate().commitments.head_data.clone();
706				candidate_receipt_with_backing_validator_indices
707					.push((candidate.receipt(), backer_idx_and_attestation));
708
709				// Update storage now
710				PendingAvailability::<T>::mutate(&para_id, |pending_availability| {
711					let new_candidate = CandidatePendingAvailability {
712						core: *core,
713						hash: candidate_hash,
714						descriptor: candidate.candidate().descriptor.clone(),
715						commitments: candidate.candidate().commitments.clone(),
716						// initialize all availability votes to 0.
717						availability_votes: bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()],
718						relay_parent_number,
719						backers: backers.to_bitvec(),
720						backed_in_number: now,
721						backing_group: group_idx,
722					};
723
724					if let Some(pending_availability) = pending_availability {
725						pending_availability.push_back(new_candidate);
726					} else {
727						*pending_availability =
728							Some([new_candidate].into_iter().collect::<VecDeque<_>>())
729					}
730				});
731
732				// Deposit backed event.
733				Self::deposit_event(Event::<T>::CandidateBacked(
734					candidate.candidate().to_plain(),
735					candidate.candidate().commitments.head_data.clone(),
736					*core,
737					group_idx,
738				));
739			}
740		}
741
742		Ok(candidate_receipt_with_backing_validator_indices)
743	}
744
745	// Get the latest backed output head data of this para (including pending availability).
746	pub(crate) fn para_latest_head_data(para_id: &ParaId) -> Option<HeadData> {
747		match PendingAvailability::<T>::get(para_id).and_then(|pending_candidates| {
748			pending_candidates.back().map(|x| x.commitments.head_data.clone())
749		}) {
750			Some(head_data) => Some(head_data),
751			None => paras::Heads::<T>::get(para_id),
752		}
753	}
754
755	// Get the relay parent number of the most recent candidate (including pending availability).
756	pub(crate) fn para_most_recent_context(para_id: &ParaId) -> Option<BlockNumberFor<T>> {
757		match PendingAvailability::<T>::get(para_id)
758			.and_then(|pending_candidates| pending_candidates.back().map(|x| x.relay_parent_number))
759		{
760			Some(relay_parent_number) => Some(relay_parent_number),
761			None => paras::MostRecentContext::<T>::get(para_id),
762		}
763	}
764
765	fn check_backing_votes(
766		backed_candidate: &BackedCandidate<T::Hash>,
767		validators: &[ValidatorId],
768		group_vals: Vec<ValidatorIndex>,
769	) -> Result<(BitVec<u8, BitOrderLsb0>, Vec<(ValidatorIndex, ValidityAttestation)>), Error<T>> {
770		let minimum_backing_votes = configuration::ActiveConfig::<T>::get().minimum_backing_votes;
771
772		let mut backers = bitvec::bitvec![u8, BitOrderLsb0; 0; validators.len()];
773		let signing_context = SigningContext {
774			parent_hash: backed_candidate.descriptor().relay_parent(),
775			session_index: shared::CurrentSessionIndex::<T>::get(),
776		};
777
778		let (validator_indices, _) = backed_candidate.validator_indices_and_core_index();
779
780		// check the signatures in the backing and that it is a majority.
781		let maybe_amount_validated = polkadot_primitives::check_candidate_backing(
782			backed_candidate.candidate().hash(),
783			backed_candidate.validity_votes(),
784			validator_indices,
785			&signing_context,
786			group_vals.len(),
787			|intra_group_vi| {
788				group_vals
789					.get(intra_group_vi)
790					.and_then(|vi| validators.get(vi.0 as usize))
791					.map(|v| v.clone())
792			},
793		);
794
795		match maybe_amount_validated {
796			Ok(amount_validated) => ensure!(
797				amount_validated >=
798					effective_minimum_backing_votes(group_vals.len(), minimum_backing_votes),
799				Error::<T>::InsufficientBacking,
800			),
801			Err(()) => {
802				Err(Error::<T>::InvalidBacking)?;
803			},
804		}
805
806		let mut backer_idx_and_attestation =
807			Vec::<(ValidatorIndex, ValidityAttestation)>::with_capacity(
808				validator_indices.count_ones(),
809			);
810
811		for ((bit_idx, _), attestation) in validator_indices
812			.iter()
813			.enumerate()
814			.filter(|(_, signed)| **signed)
815			.zip(backed_candidate.validity_votes().iter().cloned())
816		{
817			let val_idx = group_vals.get(bit_idx).expect("this query succeeded above; qed");
818			backer_idx_and_attestation.push((*val_idx, attestation));
819
820			backers.set(val_idx.0 as _, true);
821		}
822
823		Ok((backers, backer_idx_and_attestation))
824	}
825
826	/// Run the acceptance criteria checks on the given candidate commitments.
827	pub(crate) fn check_validation_outputs_for_runtime_api(
828		para_id: ParaId,
829		relay_parent_number: BlockNumberFor<T>,
830		validation_outputs: polkadot_primitives::CandidateCommitments,
831	) -> bool {
832		let prev_context = Self::para_most_recent_context(&para_id);
833		let check_ctx = CandidateCheckContext::<T>::new(prev_context);
834
835		if let Err(err) = check_ctx.check_validation_outputs(
836			para_id,
837			relay_parent_number,
838			&validation_outputs.head_data,
839			&validation_outputs.new_validation_code,
840			validation_outputs.processed_downward_messages,
841			&validation_outputs.upward_messages,
842			BlockNumberFor::<T>::from(validation_outputs.hrmp_watermark),
843			&validation_outputs.horizontal_messages,
844		) {
845			log::debug!(
846				target: LOG_TARGET,
847				"Validation outputs checking for parachain `{}` failed, error: {:?}",
848				u32::from(para_id), err
849			);
850			false
851		} else {
852			true
853		}
854	}
855
856	fn enact_candidate(
857		relay_parent_number: BlockNumberFor<T>,
858		receipt: CommittedCandidateReceipt<T::Hash>,
859		backers: BitVec<u8, BitOrderLsb0>,
860		availability_votes: BitVec<u8, BitOrderLsb0>,
861		core_index: CoreIndex,
862		backing_group: GroupIndex,
863	) {
864		let plain = receipt.to_plain();
865		let commitments = receipt.commitments;
866		let config = configuration::ActiveConfig::<T>::get();
867
868		T::RewardValidators::reward_backing(
869			backers
870				.iter()
871				.enumerate()
872				.filter(|(_, backed)| **backed)
873				.map(|(i, _)| ValidatorIndex(i as _)),
874		);
875
876		T::RewardValidators::reward_bitfields(
877			availability_votes
878				.iter()
879				.enumerate()
880				.filter(|(_, voted)| **voted)
881				.map(|(i, _)| ValidatorIndex(i as _)),
882		);
883
884		if let Some(new_code) = commitments.new_validation_code {
885			// Block number of candidate's inclusion.
886			let now = frame_system::Pallet::<T>::block_number();
887
888			paras::Pallet::<T>::schedule_code_upgrade(
889				receipt.descriptor.para_id(),
890				new_code,
891				now,
892				&config,
893				UpgradeStrategy::SetGoAheadSignal,
894			);
895		}
896
897		// enact the messaging facet of the candidate.
898		dmp::Pallet::<T>::prune_dmq(
899			receipt.descriptor.para_id(),
900			commitments.processed_downward_messages,
901		);
902		Self::receive_upward_messages(
903			receipt.descriptor.para_id(),
904			commitments.upward_messages.as_slice(),
905		);
906		hrmp::Pallet::<T>::prune_hrmp(
907			receipt.descriptor.para_id(),
908			BlockNumberFor::<T>::from(commitments.hrmp_watermark),
909		);
910		hrmp::Pallet::<T>::queue_outbound_hrmp(
911			receipt.descriptor.para_id(),
912			commitments.horizontal_messages,
913		);
914
915		Self::deposit_event(Event::<T>::CandidateIncluded(
916			plain,
917			commitments.head_data.clone(),
918			core_index,
919			backing_group,
920		));
921
922		paras::Pallet::<T>::note_new_head(
923			receipt.descriptor.para_id(),
924			commitments.head_data,
925			relay_parent_number,
926		);
927	}
928
929	pub(crate) fn relay_dispatch_queue_size(para_id: ParaId) -> (u32, u32) {
930		let fp = T::MessageQueue::footprint(AggregateMessageOrigin::Ump(UmpQueueId::Para(para_id)));
931		(fp.storage.count as u32, fp.storage.size as u32)
932	}
933
934	/// Check that all the upward messages sent by a candidate pass the acceptance criteria.
935	pub(crate) fn check_upward_messages(
936		config: &HostConfiguration<BlockNumberFor<T>>,
937		para: ParaId,
938		upward_messages: &[UpwardMessage],
939	) -> Result<(), UmpAcceptanceCheckErr> {
940		// Filter any pending UMP signals and the separator.
941		let upward_messages = skip_ump_signals(upward_messages.iter()).collect::<Vec<_>>();
942
943		// Cannot send UMP messages while off-boarding.
944		if paras::Pallet::<T>::is_offboarding(para) {
945			ensure!(upward_messages.is_empty(), UmpAcceptanceCheckErr::IsOffboarding);
946		}
947
948		let additional_msgs = upward_messages.len() as u32;
949		if additional_msgs > config.max_upward_message_num_per_candidate {
950			return Err(UmpAcceptanceCheckErr::MoreMessagesThanPermitted {
951				sent: additional_msgs,
952				permitted: config.max_upward_message_num_per_candidate,
953			})
954		}
955
956		let (para_queue_count, mut para_queue_size) = Self::relay_dispatch_queue_size(para);
957
958		if para_queue_count.saturating_add(additional_msgs) > config.max_upward_queue_count {
959			return Err(UmpAcceptanceCheckErr::CapacityExceeded {
960				count: para_queue_count.saturating_add(additional_msgs).into(),
961				limit: config.max_upward_queue_count.into(),
962			})
963		}
964
965		for (idx, msg) in upward_messages.into_iter().enumerate() {
966			let msg_size = msg.len() as u32;
967			if msg_size > config.max_upward_message_size {
968				return Err(UmpAcceptanceCheckErr::MessageSize {
969					idx: idx as u32,
970					msg_size,
971					max_size: config.max_upward_message_size,
972				})
973			}
974			// make sure that the queue is not overfilled.
975			// we do it here only once since returning false invalidates the whole relay-chain
976			// block.
977			if para_queue_size.saturating_add(msg_size) > config.max_upward_queue_size {
978				return Err(UmpAcceptanceCheckErr::TotalSizeExceeded {
979					total_size: para_queue_size.saturating_add(msg_size).into(),
980					limit: config.max_upward_queue_size.into(),
981				})
982			}
983			para_queue_size.saturating_accrue(msg_size);
984		}
985
986		Ok(())
987	}
988
989	/// Enqueues `upward_messages` from a `para`'s accepted candidate block.
990	///
991	/// This function is infallible since the candidate was already accepted and we therefore need
992	/// to deal with the messages as given. Messages that are too long will be ignored since such
993	/// candidates should have already been rejected in [`Self::check_upward_messages`].
994	pub(crate) fn receive_upward_messages(para: ParaId, upward_messages: &[Vec<u8>]) {
995		let bounded = skip_ump_signals(upward_messages.iter())
996			.filter_map(|d| {
997				BoundedSlice::try_from(&d[..])
998					.inspect_err(|_| {
999						defensive!("Accepted candidate contains too long msg, len=", d.len());
1000					})
1001					.ok()
1002			})
1003			.collect();
1004		Self::receive_bounded_upward_messages(para, bounded)
1005	}
1006
1007	/// Enqueues storage-bounded `upward_messages` from a `para`'s accepted candidate block.
1008	pub(crate) fn receive_bounded_upward_messages(
1009		para: ParaId,
1010		messages: Vec<BoundedSlice<'_, u8, MaxUmpMessageLenOf<T>>>,
1011	) {
1012		let count = messages.len() as u32;
1013		if count == 0 {
1014			return
1015		}
1016
1017		T::MessageQueue::enqueue_messages(
1018			messages.into_iter(),
1019			AggregateMessageOrigin::Ump(UmpQueueId::Para(para)),
1020		);
1021		Self::deposit_event(Event::UpwardMessagesReceived { from: para, count });
1022	}
1023
1024	/// Cleans up all timed out candidates as well as their descendant candidates.
1025	///
1026	/// Returns a vector of cleaned-up core IDs.
1027	pub(crate) fn free_timedout() -> Vec<CoreIndex> {
1028		let timeout_pred = scheduler::Pallet::<T>::availability_timeout_predicate();
1029
1030		let timed_out: Vec<_> = Self::free_failed_cores(
1031			|candidate| timeout_pred(candidate.backed_in_number).timed_out,
1032			None,
1033		)
1034		.collect();
1035
1036		let mut timed_out_cores = Vec::with_capacity(timed_out.len());
1037		for candidate in timed_out.iter() {
1038			timed_out_cores.push(candidate.core);
1039
1040			let receipt = CandidateReceipt {
1041				descriptor: candidate.descriptor.clone(),
1042				commitments_hash: candidate.commitments.hash(),
1043			};
1044
1045			Self::deposit_event(Event::<T>::CandidateTimedOut(
1046				receipt,
1047				candidate.commitments.head_data.clone(),
1048				candidate.core,
1049			));
1050		}
1051
1052		timed_out_cores
1053	}
1054
1055	/// Cleans up all cores pending availability occupied by one of the disputed candidates or which
1056	/// are descendants of a disputed candidate.
1057	///
1058	/// Returns a vector of cleaned-up core IDs, along with the evicted candidate hashes.
1059	pub(crate) fn free_disputed(
1060		disputed: &BTreeSet<CandidateHash>,
1061	) -> Vec<(CoreIndex, CandidateHash)> {
1062		Self::free_failed_cores(
1063			|candidate| disputed.contains(&candidate.hash),
1064			Some(disputed.len()),
1065		)
1066		.map(|candidate| (candidate.core, candidate.hash))
1067		.collect()
1068	}
1069
1070	// Clean up cores whose candidates are deemed as failed by the predicate. `pred` returns true if
1071	// a candidate is considered failed.
1072	// A failed candidate also frees all subsequent cores which hold descendants of said candidate.
1073	fn free_failed_cores<
1074		P: Fn(&CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>) -> bool,
1075	>(
1076		pred: P,
1077		capacity_hint: Option<usize>,
1078	) -> impl Iterator<Item = CandidatePendingAvailability<T::Hash, BlockNumberFor<T>>> {
1079		let mut earliest_dropped_indices: BTreeMap<ParaId, usize> = BTreeMap::new();
1080
1081		for (para_id, pending_candidates) in PendingAvailability::<T>::iter() {
1082			// We assume that pending candidates are stored in dependency order. So we need to store
1083			// the earliest dropped candidate. All others that follow will get freed as well.
1084			let mut earliest_dropped_idx = None;
1085			for (index, candidate) in pending_candidates.iter().enumerate() {
1086				if pred(candidate) {
1087					earliest_dropped_idx = Some(index);
1088					// Since we're looping the candidates in dependency order, we've found the
1089					// earliest failed index for this paraid.
1090					break;
1091				}
1092			}
1093
1094			if let Some(earliest_dropped_idx) = earliest_dropped_idx {
1095				earliest_dropped_indices.insert(para_id, earliest_dropped_idx);
1096			}
1097		}
1098
1099		let mut cleaned_up_cores =
1100			if let Some(capacity) = capacity_hint { Vec::with_capacity(capacity) } else { vec![] };
1101
1102		for (para_id, earliest_dropped_idx) in earliest_dropped_indices {
1103			// Do cleanups and record the cleaned up cores
1104			PendingAvailability::<T>::mutate(&para_id, |record| {
1105				if let Some(record) = record {
1106					let cleaned_up = record.drain(earliest_dropped_idx..);
1107					cleaned_up_cores.extend(cleaned_up);
1108				}
1109			});
1110		}
1111
1112		cleaned_up_cores.into_iter()
1113	}
1114
1115	/// Forcibly enact the pending candidates of the given paraid as though they had been deemed
1116	/// available by bitfields.
1117	///
1118	/// Is a no-op if there is no candidate pending availability for this para-id.
1119	/// If there are multiple candidates pending availability for this para-id, it will enact all of
1120	/// them. This should generally not be used but it is useful during execution of Runtime APIs,
1121	/// where the changes to the state are expected to be discarded directly after.
1122	pub(crate) fn force_enact(para: ParaId) {
1123		PendingAvailability::<T>::mutate(&para, |candidates| {
1124			if let Some(candidates) = candidates {
1125				for candidate in candidates.drain(..) {
1126					let receipt = CommittedCandidateReceipt {
1127						descriptor: candidate.descriptor,
1128						commitments: candidate.commitments,
1129					};
1130
1131					Self::enact_candidate(
1132						candidate.relay_parent_number,
1133						receipt,
1134						candidate.backers,
1135						candidate.availability_votes,
1136						candidate.core,
1137						candidate.backing_group,
1138					);
1139				}
1140			}
1141		});
1142	}
1143
1144	/// Returns the first `CommittedCandidateReceipt` pending availability for the para provided, if
1145	/// any.
1146	/// A para_id could have more than one candidates pending availability, if it's using elastic
1147	/// scaling. These candidates form a chain. This function returns the first in the chain.
1148	pub(crate) fn first_candidate_pending_availability(
1149		para: ParaId,
1150	) -> Option<CommittedCandidateReceipt<T::Hash>> {
1151		PendingAvailability::<T>::get(&para).and_then(|p| {
1152			p.get(0).map(|p| CommittedCandidateReceipt {
1153				descriptor: p.descriptor.clone(),
1154				commitments: p.commitments.clone(),
1155			})
1156		})
1157	}
1158
1159	/// Returns all the `CommittedCandidateReceipt` pending availability for the para provided, if
1160	/// any.
1161	pub(crate) fn candidates_pending_availability(
1162		para: ParaId,
1163	) -> Vec<CommittedCandidateReceipt<T::Hash>> {
1164		<PendingAvailability<T>>::get(&para)
1165			.map(|candidates| {
1166				candidates
1167					.into_iter()
1168					.map(|candidate| CommittedCandidateReceipt {
1169						descriptor: candidate.descriptor.clone(),
1170						commitments: candidate.commitments.clone(),
1171					})
1172					.collect()
1173			})
1174			.unwrap_or_default()
1175	}
1176}
1177
1178const fn availability_threshold(n_validators: usize) -> usize {
1179	supermajority_threshold(n_validators)
1180}
1181
1182impl AcceptanceCheckErr {
1183	/// Returns the same error so that it can be threaded through a needle of `DispatchError` and
1184	/// ultimately returned from a `Dispatchable`.
1185	fn strip_into_dispatch_err<T: Config>(self) -> Error<T> {
1186		use AcceptanceCheckErr::*;
1187		match self {
1188			HeadDataTooLarge => Error::<T>::HeadDataTooLarge,
1189			PrematureCodeUpgrade => Error::<T>::PrematureCodeUpgrade,
1190			NewCodeTooLarge => Error::<T>::NewCodeTooLarge,
1191			ProcessedDownwardMessages => Error::<T>::IncorrectDownwardMessageHandling,
1192			UpwardMessages => Error::<T>::InvalidUpwardMessages,
1193			HrmpWatermark => Error::<T>::HrmpWatermarkMishandling,
1194			OutboundHrmp => Error::<T>::InvalidOutboundHrmp,
1195		}
1196	}
1197}
1198
1199impl<T: Config> OnQueueChanged<AggregateMessageOrigin> for Pallet<T> {
1200	// Write back the remaining queue capacity into `relay_dispatch_queue_remaining_capacity`.
1201	fn on_queue_changed(origin: AggregateMessageOrigin, fp: QueueFootprint) {
1202		let para = match origin {
1203			AggregateMessageOrigin::Ump(UmpQueueId::Para(p)) => p,
1204		};
1205		let QueueFootprint { storage: Footprint { count, size }, .. } = fp;
1206		let (count, size) = (count.saturated_into(), size.saturated_into());
1207		// TODO paritytech/polkadot#6283: Remove all usages of `relay_dispatch_queue_size`
1208		#[allow(deprecated)]
1209		well_known_keys::relay_dispatch_queue_size_typed(para).set((count, size));
1210
1211		let config = configuration::ActiveConfig::<T>::get();
1212		let remaining_count = config.max_upward_queue_count.saturating_sub(count);
1213		let remaining_size = config.max_upward_queue_size.saturating_sub(size);
1214		well_known_keys::relay_dispatch_queue_remaining_capacity(para)
1215			.set((remaining_count, remaining_size));
1216	}
1217}
1218
1219/// A collection of data required for checking a candidate.
1220pub(crate) struct CandidateCheckContext<T: Config> {
1221	config: configuration::HostConfiguration<BlockNumberFor<T>>,
1222	prev_context: Option<BlockNumberFor<T>>,
1223}
1224
1225impl<T: Config> CandidateCheckContext<T> {
1226	pub(crate) fn new(prev_context: Option<BlockNumberFor<T>>) -> Self {
1227		Self { config: configuration::ActiveConfig::<T>::get(), prev_context }
1228	}
1229
1230	/// Execute verification of the candidate.
1231	///
1232	/// Assures:
1233	///  * relay-parent in-bounds
1234	///  * code hash of commitments matches current code hash
1235	///  * para head in the descriptor and commitments match
1236	///
1237	/// Returns the relay-parent block number.
1238	pub(crate) fn verify_backed_candidate(
1239		&self,
1240		allowed_relay_parents: &AllowedRelayParentsTracker<T::Hash, BlockNumberFor<T>>,
1241		backed_candidate_receipt: &CommittedCandidateReceipt<<T as frame_system::Config>::Hash>,
1242		parent_head_data: HeadData,
1243	) -> Result<BlockNumberFor<T>, Error<T>> {
1244		let para_id = backed_candidate_receipt.descriptor.para_id();
1245		let relay_parent = backed_candidate_receipt.descriptor.relay_parent();
1246
1247		// Check that the relay-parent is one of the allowed relay-parents.
1248		let (state_root, relay_parent_number) = {
1249			match allowed_relay_parents.acquire_info(relay_parent, self.prev_context) {
1250				None => return Err(Error::<T>::DisallowedRelayParent),
1251				Some((info, relay_parent_number)) => (info.state_root, relay_parent_number),
1252			}
1253		};
1254
1255		{
1256			let persisted_validation_data = make_persisted_validation_data_with_parent::<T>(
1257				relay_parent_number,
1258				state_root,
1259				parent_head_data,
1260			);
1261
1262			let expected = persisted_validation_data.hash();
1263
1264			ensure!(
1265				expected == backed_candidate_receipt.descriptor.persisted_validation_data_hash(),
1266				Error::<T>::ValidationDataHashMismatch,
1267			);
1268		}
1269
1270		let validation_code_hash = paras::CurrentCodeHash::<T>::get(para_id)
1271			// A candidate for a parachain without current validation code is not scheduled.
1272			.ok_or_else(|| Error::<T>::UnscheduledCandidate)?;
1273		ensure!(
1274			backed_candidate_receipt.descriptor.validation_code_hash() == validation_code_hash,
1275			Error::<T>::InvalidValidationCodeHash,
1276		);
1277
1278		ensure!(
1279			backed_candidate_receipt.descriptor.para_head() ==
1280				backed_candidate_receipt.commitments.head_data.hash(),
1281			Error::<T>::ParaHeadMismatch,
1282		);
1283
1284		if let Err(err) = self.check_validation_outputs(
1285			para_id,
1286			relay_parent_number,
1287			&backed_candidate_receipt.commitments.head_data,
1288			&backed_candidate_receipt.commitments.new_validation_code,
1289			backed_candidate_receipt.commitments.processed_downward_messages,
1290			&backed_candidate_receipt.commitments.upward_messages,
1291			BlockNumberFor::<T>::from(backed_candidate_receipt.commitments.hrmp_watermark),
1292			&backed_candidate_receipt.commitments.horizontal_messages,
1293		) {
1294			log::debug!(
1295				target: LOG_TARGET,
1296				"Validation outputs checking during inclusion of a candidate {:?} for parachain `{}` failed, error: {:?}",
1297				backed_candidate_receipt.hash(),
1298				u32::from(para_id),
1299				err
1300			);
1301			Err(err.strip_into_dispatch_err::<T>())?;
1302		};
1303		Ok(relay_parent_number)
1304	}
1305
1306	/// Check the given outputs after candidate validation on whether it passes the acceptance
1307	/// criteria.
1308	///
1309	/// The things that are checked can be roughly divided into limits and minimums.
1310	///
1311	/// Limits are things like max message queue sizes and max head data size.
1312	///
1313	/// Minimums are things like the minimum amount of messages that must be processed
1314	/// by the parachain block.
1315	///
1316	/// Limits are checked against the current state. The parachain block must be acceptable
1317	/// by the current relay-chain state regardless of whether it was acceptable at some relay-chain
1318	/// state in the past.
1319	///
1320	/// Minimums are checked against the current state but modulated by
1321	/// considering the information available at the relay-parent of the parachain block.
1322	fn check_validation_outputs(
1323		&self,
1324		para_id: ParaId,
1325		relay_parent_number: BlockNumberFor<T>,
1326		head_data: &HeadData,
1327		new_validation_code: &Option<polkadot_primitives::ValidationCode>,
1328		processed_downward_messages: u32,
1329		upward_messages: &[polkadot_primitives::UpwardMessage],
1330		hrmp_watermark: BlockNumberFor<T>,
1331		horizontal_messages: &[polkadot_primitives::OutboundHrmpMessage<ParaId>],
1332	) -> Result<(), AcceptanceCheckErr> {
1333		// Safe convertions when `self.config.max_head_data_size` is in bounds of `usize` type.
1334		let max_head_data_size = usize::try_from(self.config.max_head_data_size)
1335			.map_err(|_| AcceptanceCheckErr::HeadDataTooLarge)?;
1336		ensure!(head_data.0.len() <= max_head_data_size, AcceptanceCheckErr::HeadDataTooLarge);
1337
1338		// if any, the code upgrade attempt is allowed.
1339		if let Some(new_validation_code) = new_validation_code {
1340			// Safe convertions when `self.config.max_code_size` is in bounds of `usize` type.
1341			let max_code_size = usize::try_from(self.config.max_code_size)
1342				.map_err(|_| AcceptanceCheckErr::NewCodeTooLarge)?;
1343
1344			ensure!(
1345				paras::Pallet::<T>::can_upgrade_validation_code(para_id),
1346				AcceptanceCheckErr::PrematureCodeUpgrade,
1347			);
1348			ensure!(
1349				new_validation_code.0.len() <= max_code_size,
1350				AcceptanceCheckErr::NewCodeTooLarge,
1351			);
1352		}
1353
1354		// check if the candidate passes the messaging acceptance criteria
1355		dmp::Pallet::<T>::check_processed_downward_messages(
1356			para_id,
1357			relay_parent_number,
1358			processed_downward_messages,
1359		)
1360		.map_err(|e| {
1361			log::debug!(
1362				target: LOG_TARGET,
1363				"Check processed downward messages for parachain `{}` on relay parent number `{:?}` failed, error: {:?}",
1364				u32::from(para_id),
1365				relay_parent_number,
1366				e
1367			);
1368			e
1369		})?;
1370		Pallet::<T>::check_upward_messages(&self.config, para_id, upward_messages).map_err(
1371			|e| {
1372				log::debug!(
1373					target: LOG_TARGET,
1374					"Check upward messages for parachain `{}` failed, error: {:?}",
1375					u32::from(para_id),
1376					e
1377				);
1378				e
1379			},
1380		)?;
1381		hrmp::Pallet::<T>::check_hrmp_watermark(para_id, relay_parent_number, hrmp_watermark)
1382			.map_err(|e| {
1383				log::debug!(
1384					target: LOG_TARGET,
1385					"Check hrmp watermark for parachain `{}` on relay parent number `{:?}` failed, error: {:?}",
1386					u32::from(para_id),
1387					relay_parent_number,
1388					e
1389				);
1390				e
1391			})?;
1392		hrmp::Pallet::<T>::check_outbound_hrmp(&self.config, para_id, horizontal_messages)
1393			.map_err(|e| {
1394				log::debug!(
1395					target: LOG_TARGET,
1396					"Check outbound hrmp for parachain `{}` failed, error: {:?}",
1397					u32::from(para_id),
1398					e
1399				);
1400				e
1401			})?;
1402
1403		Ok(())
1404	}
1405}
1406
1407impl<T: Config> QueueFootprinter for Pallet<T> {
1408	type Origin = UmpQueueId;
1409
1410	fn message_count(origin: Self::Origin) -> u64 {
1411		T::MessageQueue::footprint(AggregateMessageOrigin::Ump(origin)).storage.count
1412	}
1413}