cumulus_pallet_xcmp_queue/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19//! * `XcmpMessageHandler`
20//! * `XcmpMessageSource`
21//!
22//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24//! `XcmExecutor` for dispatching incoming XCM messages.
25//!
26//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28//! The fee factor increases whenever the total size of messages in a particular channel passes a
29//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32//! It is defined in the channel configuration.
33//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
60use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67	defensive, defensive_assert,
68	traits::{
69		Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70		QueuePausedQuery,
71	},
72	weights::{Weight, WeightMeter},
73};
74use pallet_message_queue::OnQueueChanged;
75use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
76use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
77use scale_info::TypeInfo;
78use sp_core::MAX_POSSIBLE_ALLOCATION;
79use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
80use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
81use xcm_builder::InspectMessageQueues;
82use xcm_executor::traits::ConvertOrigin;
83
84pub use pallet::*;
85
86/// Index used to identify overweight XCMs.
87pub type OverweightIndex = u64;
88/// The max length of an XCMP message.
89pub type MaxXcmpMessageLenOf<T> =
90	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
91
92const LOG_TARGET: &str = "xcmp_queue";
93const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
94/// The size of an XCM messages batch.
95pub const XCM_BATCH_SIZE: usize = 250;
96/// The maximum number of signals that we can have in an XCMP page.
97pub const MAX_SIGNALS_PER_PAGE: usize = 3;
98
99/// Constants related to delivery fee calculation
100pub mod delivery_fee_constants {
101	/// Fees will start increasing when queue is half full
102	pub const THRESHOLD_FACTOR: u32 = 2;
103}
104
105#[frame_support::pallet]
106pub mod pallet {
107	use super::*;
108	use frame_support::{pallet_prelude::*, Twox64Concat};
109	use frame_system::pallet_prelude::*;
110
111	#[pallet::pallet]
112	#[pallet::storage_version(migration::STORAGE_VERSION)]
113	pub struct Pallet<T>(_);
114
115	#[pallet::config]
116	pub trait Config: frame_system::Config {
117		#[allow(deprecated)]
118		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
119
120		/// Information on the available XCMP channels.
121		type ChannelInfo: GetChannelInfo;
122
123		/// Means of converting an `Xcm` into a `VersionedXcm`.
124		type VersionWrapper: WrapVersion;
125
126		/// Enqueue an inbound horizontal message for later processing.
127		///
128		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
129		/// assumes that this hook will eventually process all the pushed messages.
130		type XcmpQueue: EnqueueMessage<ParaId>
131			+ QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
132
133		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
134		///
135		/// Any further channel suspensions will fail and messages may get dropped without further
136		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
137		/// [`InboundXcmpSuspended`] still applies at that scale.
138		#[pallet::constant]
139		type MaxInboundSuspended: Get<u32>;
140
141		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
142		///
143		/// If this is reached, then no further messages can be sent to channels that do not yet
144		/// have a message queued. This should be set to the expected maximum of outbound channels
145		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
146		/// since otherwise the congestion control protocol will not work as intended and messages
147		/// may be dropped. This value increases the PoV and should therefore not be picked too
148		/// high. Governance needs to pay attention to not open more channels than this value.
149		#[pallet::constant]
150		type MaxActiveOutboundChannels: Get<u32>;
151
152		/// The maximal page size for HRMP message pages.
153		///
154		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
155		/// benchmarking. The limit for the size of a message is slightly below this, since some
156		/// overhead is incurred for encoding the format.
157		#[pallet::constant]
158		type MaxPageSize: Get<u32>;
159
160		/// The origin that is allowed to resume or suspend the XCMP queue.
161		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
162
163		/// The conversion function used to attempt to convert an XCM `Location` origin to a
164		/// superuser origin.
165		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
166
167		/// The price for delivering an XCM to a sibling parachain destination.
168		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
169
170		/// The weight information of this pallet.
171		type WeightInfo: WeightInfoExt;
172	}
173
174	#[pallet::call]
175	impl<T: Config> Pallet<T> {
176		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
177		///
178		/// - `origin`: Must pass `ControllerOrigin`.
179		#[pallet::call_index(1)]
180		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
181		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
182			T::ControllerOrigin::ensure_origin(origin)?;
183
184			QueueSuspended::<T>::try_mutate(|suspended| {
185				if *suspended {
186					Err(Error::<T>::AlreadySuspended.into())
187				} else {
188					*suspended = true;
189					Ok(())
190				}
191			})
192		}
193
194		/// Resumes all XCM executions for the XCMP queue.
195		///
196		/// Note that this function doesn't change the status of the in/out bound channels.
197		///
198		/// - `origin`: Must pass `ControllerOrigin`.
199		#[pallet::call_index(2)]
200		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
201		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
202			T::ControllerOrigin::ensure_origin(origin)?;
203
204			QueueSuspended::<T>::try_mutate(|suspended| {
205				if !*suspended {
206					Err(Error::<T>::AlreadyResumed.into())
207				} else {
208					*suspended = false;
209					Ok(())
210				}
211			})
212		}
213
214		/// Overwrites the number of pages which must be in the queue for the other side to be
215		/// told to suspend their sending.
216		///
217		/// - `origin`: Must pass `Root`.
218		/// - `new`: Desired value for `QueueConfigData.suspend_value`
219		#[pallet::call_index(3)]
220		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
221		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
222			ensure_root(origin)?;
223
224			QueueConfig::<T>::try_mutate(|data| {
225				data.suspend_threshold = new;
226				data.validate::<T>()
227			})
228		}
229
230		/// Overwrites the number of pages which must be in the queue after which we drop any
231		/// further messages from the channel.
232		///
233		/// - `origin`: Must pass `Root`.
234		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
235		#[pallet::call_index(4)]
236		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
237		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
238			ensure_root(origin)?;
239
240			QueueConfig::<T>::try_mutate(|data| {
241				data.drop_threshold = new;
242				data.validate::<T>()
243			})
244		}
245
246		/// Overwrites the number of pages which the queue must be reduced to before it signals
247		/// that message sending may recommence after it has been suspended.
248		///
249		/// - `origin`: Must pass `Root`.
250		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
251		#[pallet::call_index(5)]
252		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
253		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
254			ensure_root(origin)?;
255
256			QueueConfig::<T>::try_mutate(|data| {
257				data.resume_threshold = new;
258				data.validate::<T>()
259			})
260		}
261	}
262
263	#[pallet::hooks]
264	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
265		fn integrity_test() {
266			assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
267
268			let w = Self::on_idle_weight();
269			assert!(w != Weight::zero());
270			assert!(w.all_lte(T::BlockWeights::get().max_block));
271
272			<T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
273		}
274
275		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
276			let mut meter = WeightMeter::with_limit(limit);
277
278			if meter.try_consume(Self::on_idle_weight()).is_err() {
279				tracing::debug!(
280					target: LOG_TARGET,
281					"Not enough weight for on_idle. {} < {}",
282					Self::on_idle_weight(), limit
283				);
284				return meter.consumed()
285			}
286
287			migration::v3::lazy_migrate_inbound_queue::<T>();
288
289			meter.consumed()
290		}
291	}
292
293	#[pallet::event]
294	#[pallet::generate_deposit(pub(super) fn deposit_event)]
295	pub enum Event<T: Config> {
296		/// An HRMP message was sent to a sibling parachain.
297		XcmpMessageSent { message_hash: XcmHash },
298	}
299
300	#[pallet::error]
301	pub enum Error<T> {
302		/// Setting the queue config failed since one of its values was invalid.
303		BadQueueConfig,
304		/// The execution is already suspended.
305		AlreadySuspended,
306		/// The execution is already resumed.
307		AlreadyResumed,
308		/// There are too many active outbound channels.
309		TooManyActiveOutboundChannels,
310		/// The message is too big.
311		TooBig,
312	}
313
314	/// The suspended inbound XCMP channels. All others are not suspended.
315	///
316	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
317	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
318	/// within the block and therefore only included once in the proof size.
319	///
320	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
321	/// will be smaller.
322	#[pallet::storage]
323	pub type InboundXcmpSuspended<T: Config> =
324		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
325
326	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
327	/// and last outbound message. If the two indices are equal, then it indicates an empty
328	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
329	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
330	/// case of the need to send a high-priority signal message this block.
331	/// The bool is true if there is a signal message waiting to be sent.
332	#[pallet::storage]
333	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
334		_,
335		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
336		ValueQuery,
337	>;
338
339	/// The messages outbound in a given XCMP channel.
340	#[pallet::storage]
341	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
342		_,
343		Blake2_128Concat,
344		ParaId,
345		Twox64Concat,
346		u16,
347		WeakBoundedVec<u8, T::MaxPageSize>,
348		ValueQuery,
349	>;
350
351	/// Any signal messages waiting to be sent.
352	#[pallet::storage]
353	pub(super) type SignalMessages<T: Config> =
354		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
355
356	/// The configuration which controls the dynamics of the outbound queue.
357	#[pallet::storage]
358	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
359
360	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
361	#[pallet::storage]
362	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
363
364	/// The factor to multiply the base delivery fee by.
365	#[pallet::storage]
366	pub(super) type DeliveryFeeFactor<T: Config> =
367		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
368}
369
370#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
371pub enum OutboundState {
372	Ok,
373	Suspended,
374}
375
376/// Struct containing detailed information about the outbound channel.
377#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
378pub struct OutboundChannelDetails {
379	/// The `ParaId` of the parachain that this channel is connected with.
380	recipient: ParaId,
381	/// The state of the channel.
382	state: OutboundState,
383	/// Whether or not any signals exist in this channel.
384	signals_exist: bool,
385	/// The index of the first outbound message.
386	first_index: u16,
387	/// The index of the last outbound message.
388	last_index: u16,
389}
390
391impl OutboundChannelDetails {
392	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
393		OutboundChannelDetails {
394			recipient,
395			state: OutboundState::Ok,
396			signals_exist: false,
397			first_index: 0,
398			last_index: 0,
399		}
400	}
401
402	pub fn with_signals(mut self) -> OutboundChannelDetails {
403		self.signals_exist = true;
404		self
405	}
406
407	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
408		self.state = OutboundState::Suspended;
409		self
410	}
411}
412
413#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
414pub struct QueueConfigData {
415	/// The number of pages which must be in the queue for the other side to be told to suspend
416	/// their sending.
417	suspend_threshold: u32,
418	/// The number of pages which must be in the queue after which we drop any further messages
419	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
420	/// to suspend the channel.
421	drop_threshold: u32,
422	/// The number of pages which the queue must be reduced to before it signals that
423	/// message sending may recommence after it has been suspended.
424	resume_threshold: u32,
425}
426
427impl Default for QueueConfigData {
428	fn default() -> Self {
429		// NOTE that these default values are only used on genesis. They should give a rough idea of
430		// what to set these values to, but is in no way a requirement.
431		Self {
432			drop_threshold: 48,    // 64KiB * 48 = 3MiB
433			suspend_threshold: 32, // 64KiB * 32 = 2MiB
434			resume_threshold: 8,   // 64KiB * 8 = 512KiB
435		}
436	}
437}
438
439impl QueueConfigData {
440	/// Validate all assumptions about `Self`.
441	///
442	/// Should be called prior to accepting this as new config.
443	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
444		if self.resume_threshold < self.suspend_threshold &&
445			self.suspend_threshold <= self.drop_threshold &&
446			self.resume_threshold > 0
447		{
448			Ok(())
449		} else {
450			Err(Error::<T>::BadQueueConfig.into())
451		}
452	}
453}
454
455#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
456pub enum ChannelSignal {
457	Suspend,
458	Resume,
459}
460
461impl<T: Config> Pallet<T> {
462	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
463	///
464	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
465	/// appended onto. Whether earlier unused space is used for the fragment at the risk of sending
466	/// it out of order is determined with `qos`. NOTE: For any two messages to be guaranteed to be
467	/// dispatched in order, then both must be sent with `ServiceQuality::Ordered`.
468	///
469	/// ## Background
470	///
471	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
472	///
473	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
474	/// message FRAGMENTs.
475	///
476	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
477	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
478	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
479	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
480	/// we can concatenate them into a single aggregate blob without needing to be concerned
481	/// about encoding fragment boundaries.
482	///
483	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
484	/// fragment.
485	fn send_fragment<Fragment: Encode>(
486		recipient: ParaId,
487		format: XcmpMessageFormat,
488		fragment: Fragment,
489	) -> Result<u32, MessageSendError> {
490		let encoded_fragment = fragment.encode();
491
492		// Optimization note: `max_message_size` could potentially be stored in
493		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
494
495		let channel_info =
496			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
497		// Max message size refers to aggregates, or pages. Not to individual fragments.
498		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
499		let format_size = format.encoded_size();
500		// We check the encoded fragment length plus the format size against the max message size
501		// because the format is concatenated if a new page is needed.
502		let size_to_check = encoded_fragment
503			.len()
504			.checked_add(format_size)
505			.ok_or(MessageSendError::TooBig)?;
506		if size_to_check > max_message_size {
507			return Err(MessageSendError::TooBig)
508		}
509
510		let mut all_channels = <OutboundXcmpStatus<T>>::get();
511		let channel_details = if let Some(details) =
512			all_channels.iter_mut().find(|channel| channel.recipient == recipient)
513		{
514			details
515		} else {
516			all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
517				tracing::error!(target: LOG_TARGET, error=?e, "Failed to activate HRMP channel");
518				MessageSendError::TooManyChannels
519			})?;
520			all_channels
521				.last_mut()
522				.expect("can't be empty; a new element was just pushed; qed")
523		};
524		let have_active = channel_details.last_index > channel_details.first_index;
525		// Try to append fragment to the last page, if there is enough space.
526		// We return the size of the last page inside of the option, to not calculate it again.
527		let appended_to_last_page = have_active
528			.then(|| {
529				<OutboundXcmpMessages<T>>::try_mutate(
530					recipient,
531					channel_details.last_index - 1,
532					|page| {
533						if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
534							defensive!("Bad format in outbound queue; dropping message");
535							return Err(())
536						}
537						if page.len() + encoded_fragment.len() > max_message_size {
538							return Err(())
539						}
540						for frag in encoded_fragment.iter() {
541							page.try_push(*frag)?;
542						}
543						Ok(page.len())
544					},
545				)
546				.ok()
547			})
548			.flatten();
549
550		let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
551			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
552			(number_of_pages, size)
553		} else {
554			// Need to add a new page.
555			let page_index = channel_details.last_index;
556			channel_details.last_index += 1;
557			let mut new_page = format.encode();
558			new_page.extend_from_slice(&encoded_fragment[..]);
559			let last_page_size = new_page.len();
560			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
561			let bounded_page =
562				BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
563					tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
564					MessageSendError::TooBig
565				})?;
566			let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
567			<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
568			<OutboundXcmpStatus<T>>::put(all_channels);
569			(number_of_pages, last_page_size)
570		};
571
572		// We have to count the total size here since `channel_info.total_size` is not updated at
573		// this point in time. We assume all previous pages are filled, which, in practice, is not
574		// always the case.
575		let total_size =
576			number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
577		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
578		if total_size > threshold {
579			Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
580		}
581
582		Ok(number_of_pages)
583	}
584
585	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
586	/// block.
587	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
588		let mut s = <OutboundXcmpStatus<T>>::get();
589		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
590			details.signals_exist = true;
591		} else {
592			s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
593				tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
594				Error::<T>::TooManyActiveOutboundChannels
595			})?;
596		}
597
598		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
599			(XcmpMessageFormat::Signals, signal).encode(),
600		)
601		.map_err(|error| {
602			tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
603			Error::<T>::TooBig
604		})?;
605		let page = WeakBoundedVec::force_from(page.into_inner(), None);
606
607		<SignalMessages<T>>::insert(dest, page);
608		<OutboundXcmpStatus<T>>::put(s);
609		Ok(())
610	}
611
612	fn suspend_channel(target: ParaId) {
613		<OutboundXcmpStatus<T>>::mutate(|s| {
614			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
615				let ok = details.state == OutboundState::Ok;
616				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
617				details.state = OutboundState::Suspended;
618			} else {
619				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
620					defensive!("Cannot pause channel; too many outbound channels");
621				}
622			}
623		});
624	}
625
626	fn resume_channel(target: ParaId) {
627		<OutboundXcmpStatus<T>>::mutate(|s| {
628			if let Some(index) = s.iter().position(|item| item.recipient == target) {
629				let suspended = s[index].state == OutboundState::Suspended;
630				defensive_assert!(
631					suspended,
632					"WARNING: Attempt to resume channel that was not suspended."
633				);
634				if s[index].first_index == s[index].last_index {
635					s.remove(index);
636				} else {
637					s[index].state = OutboundState::Ok;
638				}
639			} else {
640				defensive!("WARNING: Attempt to resume channel that was not suspended.");
641			}
642		});
643	}
644
645	fn enqueue_xcmp_messages<'a>(
646		sender: ParaId,
647		xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
648		meter: &mut WeightMeter,
649	) -> Result<(), ()> {
650		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
651		let batches_footprints =
652			T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
653
654		let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
655			let required_weight = T::WeightInfo::enqueue_xcmp_messages(
656				batches_footprints.first_page_pos.saturated_into(),
657				batch_info,
658			);
659
660			match meter.can_consume(required_weight) {
661				true => core::cmp::Ordering::Less,
662				false => core::cmp::Ordering::Greater,
663			}
664		});
665
666		meter.consume(T::WeightInfo::enqueue_xcmp_messages(
667			batches_footprints.first_page_pos.saturated_into(),
668			best_batch_footprint,
669		));
670		T::XcmpQueue::enqueue_messages(
671			xcms.iter().take(best_batch_footprint.msgs_count).copied(),
672			sender,
673		);
674
675		if best_batch_footprint.msgs_count < xcms.len() {
676			tracing::error!(
677				target: LOG_TARGET,
678				used_weight=?meter.consumed_ratio(),
679				"Out of weight: cannot enqueue entire XCMP messages batch; \
680				dropped some or all messages in batch."
681			);
682			return Err(());
683		}
684		Ok(())
685	}
686
687	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
688	/// individual items.
689	///
690	/// We directly encode them again since that is needed later on.
691	///
692	/// On error returns a partial batch with all the XCMs processed before the failure.
693	/// This can happen in case of a decoding/re-encoding failure.
694	pub(crate) fn take_first_concatenated_xcm<'a>(
695		data: &mut &'a [u8],
696		meter: &mut WeightMeter,
697	) -> Result<Option<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>, ()> {
698		if data.is_empty() {
699			return Ok(None)
700		}
701
702		// Let's make sure that we can decode at least an empty xcm message.
703		let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
704		if meter.try_consume(base_weight).is_err() {
705			defensive!("Out of weight; could not decode all; dropping");
706			return Err(())
707		}
708
709		let input_data = &mut &data[..];
710		let mut input = codec::CountedInput::new(input_data);
711		VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
712			|error| {
713				tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
714				()
715			},
716		)?;
717		let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
718		*data = remaining_data;
719
720		// Consume the extra weight that it took to decode this message.
721		// This depends on the message len in bytes.
722		// Saturates if it's over the limit.
723		let extra_weight =
724			T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
725		meter.consume(extra_weight);
726
727		let xcm = Some(BoundedSlice::try_from(xcm_data).map_err(|error| {
728			tracing::error!(
729				target: LOG_TARGET,
730				?error,
731				"Failed to take XCM after decoding: message is too long"
732			);
733			()
734		})?);
735
736		Ok(xcm)
737	}
738
739	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
740	/// batches.
741	///
742	/// We directly encode them again since that is needed later on.
743	pub(crate) fn take_first_concatenated_xcms<'a>(
744		data: &mut &'a [u8],
745		batch_size: usize,
746		meter: &mut WeightMeter,
747	) -> Result<
748		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
749		Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
750	> {
751		let mut batch = vec![];
752		loop {
753			match Self::take_first_concatenated_xcm(data, meter) {
754				Ok(Some(xcm)) => {
755					batch.push(xcm);
756					if batch.len() >= batch_size {
757						return Ok(batch);
758					}
759				},
760				Ok(None) => return Ok(batch),
761				Err(_) => return Err(batch),
762			}
763		}
764	}
765
766	/// The worst-case weight of `on_idle`.
767	pub fn on_idle_weight() -> Weight {
768		<T as crate::Config>::WeightInfo::on_idle_good_msg()
769			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
770	}
771
772	#[cfg(feature = "bridging")]
773	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
774		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
775	}
776
777	#[cfg(feature = "bridging")]
778	/// Returns tuple of `OutboundState` and number of queued pages.
779	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
780		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
781			let queued_pages = c.last_index.saturating_sub(c.first_index);
782			(c.state, queued_pages)
783		})
784	}
785}
786
787impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
788	// Suspends/Resumes the queue when certain thresholds are reached.
789	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
790		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
791
792		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
793		let suspended = suspended_channels.contains(&para);
794
795		if suspended && fp.ready_pages <= resume_threshold {
796			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
797				tracing::error!(
798					target: LOG_TARGET,
799					error=?err,
800					sibling=?para,
801					"defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
802				);
803			} else {
804				suspended_channels.remove(&para);
805				<InboundXcmpSuspended<T>>::put(suspended_channels);
806			}
807		} else if !suspended && fp.ready_pages >= suspend_threshold {
808			tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
809
810			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
811				// It will retry if `drop_threshold` is not reached, but it could be too late.
812				tracing::error!(
813					target: LOG_TARGET, error=?err,
814					"defensive: Could not send suspension signal; future messages may be dropped."
815				);
816			} else if let Err(err) = suspended_channels.try_insert(para) {
817				tracing::error!(
818					target: LOG_TARGET,
819					error=?err,
820					sibling=?para,
821					"Too many channels suspended; cannot suspend sibling; further messages may be dropped."
822				);
823			} else {
824				<InboundXcmpSuspended<T>>::put(suspended_channels);
825			}
826		}
827	}
828}
829
830impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
831	fn is_paused(para: &ParaId) -> bool {
832		if !QueueSuspended::<T>::get() {
833			return false
834		}
835
836		// Make an exception for the superuser queue:
837		let sender_origin = T::ControllerOriginConverter::convert_origin(
838			(Parent, Parachain((*para).into())),
839			OriginKind::Superuser,
840		);
841		let is_controller =
842			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
843
844		!is_controller
845	}
846}
847
848impl<T: Config> XcmpMessageHandler for Pallet<T> {
849	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
850		iter: I,
851		max_weight: Weight,
852	) -> Weight {
853		let mut meter = WeightMeter::with_limit(max_weight);
854
855		let mut known_xcm_senders = BTreeSet::new();
856		for (sender, _sent_at, mut data) in iter {
857			let format = match XcmpMessageFormat::decode(&mut data) {
858				Ok(f) => f,
859				Err(_) => {
860					defensive!("Unknown XCMP message format - dropping");
861					continue
862				},
863			};
864
865			match format {
866				XcmpMessageFormat::Signals => {
867					let mut signal_count = 0;
868					while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
869						signal_count += 1;
870						match ChannelSignal::decode(&mut data) {
871							Ok(ChannelSignal::Suspend) => {
872								if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
873									defensive!(
874										"Not enough weight to process suspend signal - dropping"
875									);
876									break
877								}
878								Self::suspend_channel(sender)
879							},
880							Ok(ChannelSignal::Resume) => {
881								if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
882									defensive!(
883										"Not enough weight to process resume signal - dropping"
884									);
885									break
886								}
887								Self::resume_channel(sender)
888							},
889							Err(_) => {
890								defensive!("Undecodable channel signal - dropping");
891								break
892							},
893						}
894					}
895				},
896				XcmpMessageFormat::ConcatenatedVersionedXcm => {
897					if known_xcm_senders.insert(sender) {
898						if meter
899							.try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
900							.is_err()
901						{
902							defensive!(
903								"Out of weight: cannot enqueue XCMP messages; dropping page; \
904                                    Used weight: ",
905								meter.consumed_ratio()
906							);
907							continue;
908						}
909					}
910
911					let mut can_process_next_batch = true;
912					while can_process_next_batch {
913						let batch = match Self::take_first_concatenated_xcms(
914							&mut data,
915							XCM_BATCH_SIZE,
916							&mut meter,
917						) {
918							Ok(batch) => batch,
919							Err(batch) => {
920								can_process_next_batch = false;
921								defensive!(
922									"HRMP inbound decode stream broke; page will be dropped."
923								);
924								batch
925							},
926						};
927						if batch.is_empty() {
928							break;
929						}
930
931						if let Err(()) = Self::enqueue_xcmp_messages(sender, &batch, &mut meter) {
932							break
933						}
934					}
935				},
936				XcmpMessageFormat::ConcatenatedEncodedBlob => {
937					defensive!("Blob messages are unhandled - dropping");
938					continue
939				},
940			}
941		}
942
943		meter.consumed()
944	}
945}
946
947impl<T: Config> XcmpMessageSource for Pallet<T> {
948	fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
949		let mut statuses = <OutboundXcmpStatus<T>>::get();
950		let old_statuses_len = statuses.len();
951		let max_message_count = statuses.len().min(maximum_channels);
952		let mut result = Vec::with_capacity(max_message_count);
953
954		for status in statuses.iter_mut() {
955			let OutboundChannelDetails {
956				recipient: para_id,
957				state: outbound_state,
958				mut signals_exist,
959				mut first_index,
960				mut last_index,
961			} = *status;
962
963			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
964				ChannelStatus::Closed => {
965					// This means that there is no such channel anymore. Nothing to be done but
966					// swallow the messages and discard the status.
967					for i in first_index..last_index {
968						<OutboundXcmpMessages<T>>::remove(para_id, i);
969					}
970					if signals_exist {
971						<SignalMessages<T>>::remove(para_id);
972					}
973					*status = OutboundChannelDetails::new(para_id);
974					continue
975				},
976				ChannelStatus::Full => continue,
977				ChannelStatus::Ready(n, e) => (n, e),
978			};
979
980			// This is a hard limit from the host config; not even signals can bypass it.
981			if result.len() == max_message_count {
982				// We check this condition in the beginning of the loop so that we don't include
983				// a message where the limit is 0.
984				break
985			}
986
987			let page = if signals_exist {
988				let page = <SignalMessages<T>>::get(para_id);
989				defensive_assert!(!page.is_empty(), "Signals must exist");
990
991				if page.len() < max_size_now {
992					<SignalMessages<T>>::remove(para_id);
993					signals_exist = false;
994					page
995				} else {
996					defensive!("Signals should fit into a single page");
997					continue
998				}
999			} else if outbound_state == OutboundState::Suspended {
1000				// Signals are exempt from suspension.
1001				continue
1002			} else if last_index > first_index {
1003				let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
1004				if page.len() < max_size_now {
1005					<OutboundXcmpMessages<T>>::remove(para_id, first_index);
1006					first_index += 1;
1007					page
1008				} else {
1009					continue
1010				}
1011			} else {
1012				continue
1013			};
1014			if first_index == last_index {
1015				first_index = 0;
1016				last_index = 0;
1017			}
1018
1019			if page.len() > max_size_ever {
1020				// TODO: #274 This means that the channel's max message size has changed since
1021				//   the message was sent. We should parse it and split into smaller messages but
1022				//   since it's so unlikely then for now we just drop it.
1023				defensive!("WARNING: oversize message in queue - dropping");
1024			} else {
1025				result.push((para_id, page.into_inner()));
1026			}
1027
1028			let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1029				Some(channel_info) => channel_info.max_total_size,
1030				None => {
1031					tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1032					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
1033				},
1034			};
1035			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1036			let remaining_total_size: usize = (first_index..last_index)
1037				.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1038				.sum();
1039			if remaining_total_size <= threshold as usize {
1040				Self::decrease_fee_factor(para_id);
1041			}
1042
1043			*status = OutboundChannelDetails {
1044				recipient: para_id,
1045				state: outbound_state,
1046				signals_exist,
1047				first_index,
1048				last_index,
1049			};
1050		}
1051		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1052
1053		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
1054		// criteria requirement.
1055		result.sort_by_key(|m| m.0);
1056
1057		// Prune hrmp channels that became empty. Additionally, because it may so happen that we
1058		// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
1059		// change the order. Otherwise, the next `on_finalize` we will again give attention
1060		// only to those channels that happen to be in the beginning, until they are emptied.
1061		// This leads to "starvation" of the channels near to the end.
1062		//
1063		// To mitigate this we shift all processed elements towards the end of the vector using
1064		// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
1065		statuses.retain(|x| {
1066			x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1067		});
1068
1069		// old_status_len must be >= status.len() since we never add anything to status.
1070		let pruned = old_statuses_len - statuses.len();
1071		// removing an item from status implies a message being sent, so the result messages must
1072		// be no less than the pruned channels.
1073		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1074			"Could not store HRMP channels config. Some HRMP channels may be broken.",
1075		);
1076
1077		<OutboundXcmpStatus<T>>::put(statuses);
1078
1079		result
1080	}
1081}
1082
1083/// Xcm sender for sending to a sibling parachain.
1084impl<T: Config> SendXcm for Pallet<T> {
1085	type Ticket = (ParaId, VersionedXcm<()>);
1086
1087	fn validate(
1088		dest: &mut Option<Location>,
1089		msg: &mut Option<Xcm<()>>,
1090	) -> SendResult<(ParaId, VersionedXcm<()>)> {
1091		let d = dest.take().ok_or(SendError::MissingArgument)?;
1092
1093		match d.unpack() {
1094			// An HRMP message for a sibling parachain.
1095			(1, [Parachain(id)]) => {
1096				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1097				let id = ParaId::from(*id);
1098				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1099				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1100					.map_err(|()| SendError::DestinationUnsupported)?;
1101				versioned_xcm
1102					.check_is_decodable()
1103					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
1104
1105				Ok(((id, versioned_xcm), price))
1106			},
1107			_ => {
1108				// Anything else is unhandled. This includes a message that is not meant for us.
1109				// We need to make sure that dest/msg is not consumed here.
1110				*dest = Some(d);
1111				Err(SendError::NotApplicable)
1112			},
1113		}
1114	}
1115
1116	fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1117		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1118
1119		match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1120			Ok(_) => {
1121				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1122				Ok(hash)
1123			},
1124			Err(e) => {
1125				tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1126				Err(SendError::Transport(e.into()))
1127			},
1128		}
1129	}
1130}
1131
1132impl<T: Config> InspectMessageQueues for Pallet<T> {
1133	fn clear_messages() {
1134		// Best effort.
1135		let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1136		OutboundXcmpStatus::<T>::mutate(|details_vec| {
1137			for details in details_vec {
1138				details.first_index = 0;
1139				details.last_index = 0;
1140			}
1141		});
1142	}
1143
1144	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1145		use xcm::prelude::*;
1146
1147		OutboundXcmpMessages::<T>::iter()
1148			.map(|(para_id, _, messages)| {
1149				let mut data = &messages[..];
1150				let decoded_format = XcmpMessageFormat::decode(&mut data).unwrap();
1151				if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
1152					panic!("Unexpected format.")
1153				}
1154				let mut decoded_messages = Vec::new();
1155				while !data.is_empty() {
1156					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1157						MAX_XCM_DECODE_DEPTH,
1158						&mut data,
1159					)
1160					.unwrap();
1161					decoded_messages.push(decoded_message);
1162				}
1163
1164				(
1165					VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1166					decoded_messages,
1167				)
1168			})
1169			.collect()
1170	}
1171}
1172
1173impl<T: Config> FeeTracker for Pallet<T> {
1174	type Id = ParaId;
1175
1176	fn get_fee_factor(id: Self::Id) -> FixedU128 {
1177		<DeliveryFeeFactor<T>>::get(id)
1178	}
1179
1180	fn set_fee_factor(id: Self::Id, val: FixedU128) {
1181		<DeliveryFeeFactor<T>>::set(id, val);
1182	}
1183}