cumulus_pallet_xcmp_queue/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: Apache-2.0
4
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// 	http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17//! A pallet which uses the XCMP transport layer to handle both incoming and outgoing XCM message
18//! sending and dispatch, queuing, signalling and backpressure. To do so, it implements:
19//! * `XcmpMessageHandler`
20//! * `XcmpMessageSource`
21//!
22//! Also provides an implementation of `SendXcm` which can be placed in a router tuple for relaying
23//! XCM over XCMP if the destination is `Parent/Parachain`. It requires an implementation of
24//! `XcmExecutor` for dispatching incoming XCM messages.
25//!
26//! To prevent out of memory errors on the `OutboundXcmpMessages` queue, an exponential fee factor
27//! (`DeliveryFeeFactor`) is set, much like the one used in DMP.
28//! The fee factor increases whenever the total size of messages in a particular channel passes a
29//! threshold. This threshold is defined as a percentage of the maximum total size the channel can
30//! have. More concretely, the threshold is `max_total_size` / `THRESHOLD_FACTOR`, where:
31//! - `max_total_size` is the maximum size, in bytes, of the channel, not number of messages.
32//! It is defined in the channel configuration.
33//! - `THRESHOLD_FACTOR` just declares which percentage of the max size is the actual threshold.
34//! If it's 2, then the threshold is half of the max size, if it's 4, it's a quarter, and so on.
35
36#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::BoundedBTreeSet;
60use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62	relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63	ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67	defensive, defensive_assert,
68	traits::{
69		Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70		QueuePausedQuery,
71	},
72	weights::{Weight, WeightMeter},
73	BoundedVec,
74};
75use pallet_message_queue::OnQueueChanged;
76use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
77use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
78use scale_info::TypeInfo;
79use sp_core::MAX_POSSIBLE_ALLOCATION;
80use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
81use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
82use xcm_builder::InspectMessageQueues;
83use xcm_executor::traits::ConvertOrigin;
84
85pub use pallet::*;
86
87/// Index used to identify overweight XCMs.
88pub type OverweightIndex = u64;
89/// The max length of an XCMP message.
90pub type MaxXcmpMessageLenOf<T> =
91	<<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
92
93const LOG_TARGET: &str = "xcmp_queue";
94const DEFAULT_POV_SIZE: u64 = 64 * 1024; // 64 KB
95/// The size of an XCM messages batch.
96pub const XCM_BATCH_SIZE: usize = 250;
97/// The maximum number of signals that we can have in an XCMP page.
98pub const MAX_SIGNALS_PER_PAGE: usize = 3;
99
100/// Constants related to delivery fee calculation
101pub mod delivery_fee_constants {
102	/// Fees will start increasing when queue is half full
103	pub const THRESHOLD_FACTOR: u32 = 2;
104}
105
106#[frame_support::pallet]
107pub mod pallet {
108	use super::*;
109	use frame_support::{pallet_prelude::*, Twox64Concat};
110	use frame_system::pallet_prelude::*;
111
112	#[pallet::pallet]
113	#[pallet::storage_version(migration::STORAGE_VERSION)]
114	pub struct Pallet<T>(_);
115
116	#[pallet::config]
117	pub trait Config: frame_system::Config {
118		#[allow(deprecated)]
119		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
120
121		/// Information on the available XCMP channels.
122		type ChannelInfo: GetChannelInfo;
123
124		/// Means of converting an `Xcm` into a `VersionedXcm`.
125		type VersionWrapper: WrapVersion;
126
127		/// Enqueue an inbound horizontal message for later processing.
128		///
129		/// This defines the maximal message length via [`crate::MaxXcmpMessageLenOf`]. The pallet
130		/// assumes that this hook will eventually process all the pushed messages.
131		type XcmpQueue: EnqueueMessage<ParaId>
132			+ QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
133
134		/// The maximum number of inbound XCMP channels that can be suspended simultaneously.
135		///
136		/// Any further channel suspensions will fail and messages may get dropped without further
137		/// notice. Choosing a high value (1000) is okay; the trade-off that is described in
138		/// [`InboundXcmpSuspended`] still applies at that scale.
139		#[pallet::constant]
140		type MaxInboundSuspended: Get<u32>;
141
142		/// Maximal number of outbound XCMP channels that can have messages queued at the same time.
143		///
144		/// If this is reached, then no further messages can be sent to channels that do not yet
145		/// have a message queued. This should be set to the expected maximum of outbound channels
146		/// which is determined by [`Self::ChannelInfo`]. It is important to set this large enough,
147		/// since otherwise the congestion control protocol will not work as intended and messages
148		/// may be dropped. This value increases the PoV and should therefore not be picked too
149		/// high. Governance needs to pay attention to not open more channels than this value.
150		#[pallet::constant]
151		type MaxActiveOutboundChannels: Get<u32>;
152
153		/// The maximal page size for HRMP message pages.
154		///
155		/// A lower limit can be set dynamically, but this is the hard-limit for the PoV worst case
156		/// benchmarking. The limit for the size of a message is slightly below this, since some
157		/// overhead is incurred for encoding the format.
158		#[pallet::constant]
159		type MaxPageSize: Get<u32>;
160
161		/// The origin that is allowed to resume or suspend the XCMP queue.
162		type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
163
164		/// The conversion function used to attempt to convert an XCM `Location` origin to a
165		/// superuser origin.
166		type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
167
168		/// The price for delivering an XCM to a sibling parachain destination.
169		type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
170
171		/// The weight information of this pallet.
172		type WeightInfo: WeightInfoExt;
173	}
174
175	#[pallet::call]
176	impl<T: Config> Pallet<T> {
177		/// Suspends all XCM executions for the XCMP queue, regardless of the sender's origin.
178		///
179		/// - `origin`: Must pass `ControllerOrigin`.
180		#[pallet::call_index(1)]
181		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
182		pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
183			T::ControllerOrigin::ensure_origin(origin)?;
184
185			QueueSuspended::<T>::try_mutate(|suspended| {
186				if *suspended {
187					Err(Error::<T>::AlreadySuspended.into())
188				} else {
189					*suspended = true;
190					Ok(())
191				}
192			})
193		}
194
195		/// Resumes all XCM executions for the XCMP queue.
196		///
197		/// Note that this function doesn't change the status of the in/out bound channels.
198		///
199		/// - `origin`: Must pass `ControllerOrigin`.
200		#[pallet::call_index(2)]
201		#[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
202		pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
203			T::ControllerOrigin::ensure_origin(origin)?;
204
205			QueueSuspended::<T>::try_mutate(|suspended| {
206				if !*suspended {
207					Err(Error::<T>::AlreadyResumed.into())
208				} else {
209					*suspended = false;
210					Ok(())
211				}
212			})
213		}
214
215		/// Overwrites the number of pages which must be in the queue for the other side to be
216		/// told to suspend their sending.
217		///
218		/// - `origin`: Must pass `Root`.
219		/// - `new`: Desired value for `QueueConfigData.suspend_value`
220		#[pallet::call_index(3)]
221		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
222		pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
223			ensure_root(origin)?;
224
225			QueueConfig::<T>::try_mutate(|data| {
226				data.suspend_threshold = new;
227				data.validate::<T>()
228			})
229		}
230
231		/// Overwrites the number of pages which must be in the queue after which we drop any
232		/// further messages from the channel.
233		///
234		/// - `origin`: Must pass `Root`.
235		/// - `new`: Desired value for `QueueConfigData.drop_threshold`
236		#[pallet::call_index(4)]
237		#[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
238		pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
239			ensure_root(origin)?;
240
241			QueueConfig::<T>::try_mutate(|data| {
242				data.drop_threshold = new;
243				data.validate::<T>()
244			})
245		}
246
247		/// Overwrites the number of pages which the queue must be reduced to before it signals
248		/// that message sending may recommence after it has been suspended.
249		///
250		/// - `origin`: Must pass `Root`.
251		/// - `new`: Desired value for `QueueConfigData.resume_threshold`
252		#[pallet::call_index(5)]
253		#[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
254		pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
255			ensure_root(origin)?;
256
257			QueueConfig::<T>::try_mutate(|data| {
258				data.resume_threshold = new;
259				data.validate::<T>()
260			})
261		}
262	}
263
264	#[pallet::hooks]
265	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
266		fn integrity_test() {
267			assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
268
269			let w = Self::on_idle_weight();
270			assert!(w != Weight::zero());
271			assert!(w.all_lte(T::BlockWeights::get().max_block));
272
273			<T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
274		}
275
276		fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
277			let mut meter = WeightMeter::with_limit(limit);
278
279			if meter.try_consume(Self::on_idle_weight()).is_err() {
280				log::debug!(
281					"Not enough weight for on_idle. {} < {}",
282					Self::on_idle_weight(),
283					limit
284				);
285				return meter.consumed()
286			}
287
288			migration::v3::lazy_migrate_inbound_queue::<T>();
289
290			meter.consumed()
291		}
292	}
293
294	#[pallet::event]
295	#[pallet::generate_deposit(pub(super) fn deposit_event)]
296	pub enum Event<T: Config> {
297		/// An HRMP message was sent to a sibling parachain.
298		XcmpMessageSent { message_hash: XcmHash },
299	}
300
301	#[pallet::error]
302	pub enum Error<T> {
303		/// Setting the queue config failed since one of its values was invalid.
304		BadQueueConfig,
305		/// The execution is already suspended.
306		AlreadySuspended,
307		/// The execution is already resumed.
308		AlreadyResumed,
309		/// There are too many active outbound channels.
310		TooManyActiveOutboundChannels,
311		/// The message is too big.
312		TooBig,
313	}
314
315	/// The suspended inbound XCMP channels. All others are not suspended.
316	///
317	/// This is a `StorageValue` instead of a `StorageMap` since we expect multiple reads per block
318	/// to different keys with a one byte payload. The access to `BoundedBTreeSet` will be cached
319	/// within the block and therefore only included once in the proof size.
320	///
321	/// NOTE: The PoV benchmarking cannot know this and will over-estimate, but the actual proof
322	/// will be smaller.
323	#[pallet::storage]
324	pub type InboundXcmpSuspended<T: Config> =
325		StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
326
327	/// The non-empty XCMP channels in order of becoming non-empty, and the index of the first
328	/// and last outbound message. If the two indices are equal, then it indicates an empty
329	/// queue and there must be a non-`Ok` `OutboundStatus`. We assume queues grow no greater
330	/// than 65535 items. Queue indices for normal messages begin at one; zero is reserved in
331	/// case of the need to send a high-priority signal message this block.
332	/// The bool is true if there is a signal message waiting to be sent.
333	#[pallet::storage]
334	pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
335		_,
336		BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
337		ValueQuery,
338	>;
339
340	/// The messages outbound in a given XCMP channel.
341	#[pallet::storage]
342	pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
343		_,
344		Blake2_128Concat,
345		ParaId,
346		Twox64Concat,
347		u16,
348		WeakBoundedVec<u8, T::MaxPageSize>,
349		ValueQuery,
350	>;
351
352	/// Any signal messages waiting to be sent.
353	#[pallet::storage]
354	pub(super) type SignalMessages<T: Config> =
355		StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
356
357	/// The configuration which controls the dynamics of the outbound queue.
358	#[pallet::storage]
359	pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
360
361	/// Whether or not the XCMP queue is suspended from executing incoming XCMs or not.
362	#[pallet::storage]
363	pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
364
365	/// The factor to multiply the base delivery fee by.
366	#[pallet::storage]
367	pub(super) type DeliveryFeeFactor<T: Config> =
368		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
369}
370
371#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
372pub enum OutboundState {
373	Ok,
374	Suspended,
375}
376
377/// Struct containing detailed information about the outbound channel.
378#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
379pub struct OutboundChannelDetails {
380	/// The `ParaId` of the parachain that this channel is connected with.
381	recipient: ParaId,
382	/// The state of the channel.
383	state: OutboundState,
384	/// Whether or not any signals exist in this channel.
385	signals_exist: bool,
386	/// The index of the first outbound message.
387	first_index: u16,
388	/// The index of the last outbound message.
389	last_index: u16,
390}
391
392impl OutboundChannelDetails {
393	pub fn new(recipient: ParaId) -> OutboundChannelDetails {
394		OutboundChannelDetails {
395			recipient,
396			state: OutboundState::Ok,
397			signals_exist: false,
398			first_index: 0,
399			last_index: 0,
400		}
401	}
402
403	pub fn with_signals(mut self) -> OutboundChannelDetails {
404		self.signals_exist = true;
405		self
406	}
407
408	pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
409		self.state = OutboundState::Suspended;
410		self
411	}
412}
413
414#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
415pub struct QueueConfigData {
416	/// The number of pages which must be in the queue for the other side to be told to suspend
417	/// their sending.
418	suspend_threshold: u32,
419	/// The number of pages which must be in the queue after which we drop any further messages
420	/// from the channel. This should normally not happen since the `suspend_threshold` can be used
421	/// to suspend the channel.
422	drop_threshold: u32,
423	/// The number of pages which the queue must be reduced to before it signals that
424	/// message sending may recommence after it has been suspended.
425	resume_threshold: u32,
426}
427
428impl Default for QueueConfigData {
429	fn default() -> Self {
430		// NOTE that these default values are only used on genesis. They should give a rough idea of
431		// what to set these values to, but is in no way a requirement.
432		Self {
433			drop_threshold: 48,    // 64KiB * 48 = 3MiB
434			suspend_threshold: 32, // 64KiB * 32 = 2MiB
435			resume_threshold: 8,   // 64KiB * 8 = 512KiB
436		}
437	}
438}
439
440impl QueueConfigData {
441	/// Validate all assumptions about `Self`.
442	///
443	/// Should be called prior to accepting this as new config.
444	pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
445		if self.resume_threshold < self.suspend_threshold &&
446			self.suspend_threshold <= self.drop_threshold &&
447			self.resume_threshold > 0
448		{
449			Ok(())
450		} else {
451			Err(Error::<T>::BadQueueConfig.into())
452		}
453	}
454}
455
456#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
457pub enum ChannelSignal {
458	Suspend,
459	Resume,
460}
461
462impl<T: Config> Pallet<T> {
463	/// Place a message `fragment` on the outgoing XCMP queue for `recipient`.
464	///
465	/// Format is the type of aggregate message that the `fragment` may be safely encoded and
466	/// appended onto. Whether earlier unused space is used for the fragment at the risk of sending
467	/// it out of order is determined with `qos`. NOTE: For any two messages to be guaranteed to be
468	/// dispatched in order, then both must be sent with `ServiceQuality::Ordered`.
469	///
470	/// ## Background
471	///
472	/// For our purposes, one HRMP "message" is actually an aggregated block of XCM "messages".
473	///
474	/// For the sake of clarity, we distinguish between them as message AGGREGATEs versus
475	/// message FRAGMENTs.
476	///
477	/// So each AGGREGATE is comprised of one or more concatenated SCALE-encoded `Vec<u8>`
478	/// FRAGMENTs. Though each fragment is already probably a SCALE-encoded Xcm, we can't be
479	/// certain, so we SCALE encode each `Vec<u8>` fragment in order to ensure we have the
480	/// length prefixed and can thus decode each fragment from the aggregate stream. With this,
481	/// we can concatenate them into a single aggregate blob without needing to be concerned
482	/// about encoding fragment boundaries.
483	///
484	/// If successful, returns the number of pages in the outbound queue after enqueuing the new
485	/// fragment.
486	fn send_fragment<Fragment: Encode>(
487		recipient: ParaId,
488		format: XcmpMessageFormat,
489		fragment: Fragment,
490	) -> Result<u32, MessageSendError> {
491		let encoded_fragment = fragment.encode();
492
493		// Optimization note: `max_message_size` could potentially be stored in
494		// `OutboundXcmpMessages` once known; that way it's only accessed when a new page is needed.
495
496		let channel_info =
497			T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
498		// Max message size refers to aggregates, or pages. Not to individual fragments.
499		let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
500		let format_size = format.encoded_size();
501		// We check the encoded fragment length plus the format size against the max message size
502		// because the format is concatenated if a new page is needed.
503		let size_to_check = encoded_fragment
504			.len()
505			.checked_add(format_size)
506			.ok_or(MessageSendError::TooBig)?;
507		if size_to_check > max_message_size {
508			return Err(MessageSendError::TooBig)
509		}
510
511		let mut all_channels = <OutboundXcmpStatus<T>>::get();
512		let channel_details = if let Some(details) =
513			all_channels.iter_mut().find(|channel| channel.recipient == recipient)
514		{
515			details
516		} else {
517			all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
518				log::error!("Failed to activate HRMP channel: {:?}", e);
519				MessageSendError::TooManyChannels
520			})?;
521			all_channels
522				.last_mut()
523				.expect("can't be empty; a new element was just pushed; qed")
524		};
525		let have_active = channel_details.last_index > channel_details.first_index;
526		// Try to append fragment to the last page, if there is enough space.
527		// We return the size of the last page inside of the option, to not calculate it again.
528		let appended_to_last_page = have_active
529			.then(|| {
530				<OutboundXcmpMessages<T>>::try_mutate(
531					recipient,
532					channel_details.last_index - 1,
533					|page| {
534						if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
535							defensive!("Bad format in outbound queue; dropping message");
536							return Err(())
537						}
538						if page.len() + encoded_fragment.len() > max_message_size {
539							return Err(())
540						}
541						for frag in encoded_fragment.iter() {
542							page.try_push(*frag)?;
543						}
544						Ok(page.len())
545					},
546				)
547				.ok()
548			})
549			.flatten();
550
551		let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
552			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
553			(number_of_pages, size)
554		} else {
555			// Need to add a new page.
556			let page_index = channel_details.last_index;
557			channel_details.last_index += 1;
558			let mut new_page = format.encode();
559			new_page.extend_from_slice(&encoded_fragment[..]);
560			let last_page_size = new_page.len();
561			let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
562			let bounded_page =
563				BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
564					log::debug!(target: LOG_TARGET, "Failed to create bounded message page: {error:?}");
565					MessageSendError::TooBig
566				})?;
567			let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
568			<OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
569			<OutboundXcmpStatus<T>>::put(all_channels);
570			(number_of_pages, last_page_size)
571		};
572
573		// We have to count the total size here since `channel_info.total_size` is not updated at
574		// this point in time. We assume all previous pages are filled, which, in practice, is not
575		// always the case.
576		let total_size =
577			number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
578		let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
579		if total_size > threshold {
580			Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
581		}
582
583		Ok(number_of_pages)
584	}
585
586	/// Sends a signal to the `dest` chain over XCMP. This is guaranteed to be dispatched on this
587	/// block.
588	fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
589		let mut s = <OutboundXcmpStatus<T>>::get();
590		if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
591			details.signals_exist = true;
592		} else {
593			s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
594				log::debug!(target: LOG_TARGET, "Failed to activate XCMP channel: {error:?}");
595				Error::<T>::TooManyActiveOutboundChannels
596			})?;
597		}
598
599		let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
600			(XcmpMessageFormat::Signals, signal).encode(),
601		)
602		.map_err(|error| {
603			log::debug!(target: LOG_TARGET, "Failed to encode signal message: {error:?}");
604			Error::<T>::TooBig
605		})?;
606		let page = WeakBoundedVec::force_from(page.into_inner(), None);
607
608		<SignalMessages<T>>::insert(dest, page);
609		<OutboundXcmpStatus<T>>::put(s);
610		Ok(())
611	}
612
613	fn suspend_channel(target: ParaId) {
614		<OutboundXcmpStatus<T>>::mutate(|s| {
615			if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
616				let ok = details.state == OutboundState::Ok;
617				defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
618				details.state = OutboundState::Suspended;
619			} else {
620				if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
621					defensive!("Cannot pause channel; too many outbound channels");
622				}
623			}
624		});
625	}
626
627	fn resume_channel(target: ParaId) {
628		<OutboundXcmpStatus<T>>::mutate(|s| {
629			if let Some(index) = s.iter().position(|item| item.recipient == target) {
630				let suspended = s[index].state == OutboundState::Suspended;
631				defensive_assert!(
632					suspended,
633					"WARNING: Attempt to resume channel that was not suspended."
634				);
635				if s[index].first_index == s[index].last_index {
636					s.remove(index);
637				} else {
638					s[index].state = OutboundState::Ok;
639				}
640			} else {
641				defensive!("WARNING: Attempt to resume channel that was not suspended.");
642			}
643		});
644	}
645
646	fn enqueue_xcmp_messages(
647		sender: ParaId,
648		xcms: &[BoundedVec<u8, MaxXcmpMessageLenOf<T>>],
649		meter: &mut WeightMeter,
650	) -> Result<(), ()> {
651		let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
652		let batches_footprints = T::XcmpQueue::get_batches_footprints(
653			sender,
654			xcms.iter().map(|xcm| xcm.as_bounded_slice()),
655			drop_threshold,
656		);
657
658		let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
659			let required_weight = T::WeightInfo::enqueue_xcmp_messages(
660				batches_footprints.first_page_pos.saturated_into(),
661				batch_info,
662			);
663
664			match meter.can_consume(required_weight) {
665				true => core::cmp::Ordering::Less,
666				false => core::cmp::Ordering::Greater,
667			}
668		});
669
670		meter.consume(T::WeightInfo::enqueue_xcmp_messages(
671			batches_footprints.first_page_pos.saturated_into(),
672			best_batch_footprint,
673		));
674		T::XcmpQueue::enqueue_messages(
675			xcms.iter()
676				.take(best_batch_footprint.msgs_count)
677				.map(|xcm| xcm.as_bounded_slice()),
678			sender,
679		);
680
681		if best_batch_footprint.msgs_count < xcms.len() {
682			log::error!(
683				"Out of weight: cannot enqueue entire XCMP messages batch; \
684				dropped some or all messages in batch. Used weight: {:?}",
685				meter.consumed_ratio()
686			);
687			return Err(());
688		}
689		Ok(())
690	}
691
692	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
693	/// individual items.
694	///
695	/// We directly encode them again since that is needed later on.
696	///
697	/// On error returns a partial batch with all the XCMs processed before the failure.
698	/// This can happen in case of a decoding/re-encoding failure.
699	pub(crate) fn take_first_concatenated_xcm(
700		data: &mut &[u8],
701		meter: &mut WeightMeter,
702	) -> Result<Option<BoundedVec<u8, MaxXcmpMessageLenOf<T>>>, ()> {
703		if data.is_empty() {
704			return Ok(None)
705		}
706
707		if meter.try_consume(T::WeightInfo::take_first_concatenated_xcm()).is_err() {
708			defensive!("Out of weight; could not decode all; dropping");
709			return Err(())
710		}
711
712		let xcm = VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, data).map_err(
713			|error| {
714				log::debug!(target: LOG_TARGET, "Failed to decode XCM with depth limit: {error:?}");
715				()
716			},
717		)?;
718		Ok(Some(xcm.encode().try_into().map_err(|error| {
719			log::debug!(target: LOG_TARGET, "Failed to encode XCM after decoding: {error:?}");
720			()
721		})?))
722	}
723
724	/// Split concatenated encoded `VersionedXcm`s or `MaybeDoubleEncodedVersionedXcm`s into
725	/// batches.
726	///
727	/// We directly encode them again since that is needed later on.
728	pub(crate) fn take_first_concatenated_xcms(
729		data: &mut &[u8],
730		batch_size: usize,
731		meter: &mut WeightMeter,
732	) -> Result<
733		Vec<BoundedVec<u8, MaxXcmpMessageLenOf<T>>>,
734		Vec<BoundedVec<u8, MaxXcmpMessageLenOf<T>>>,
735	> {
736		let mut batch = vec![];
737		loop {
738			match Self::take_first_concatenated_xcm(data, meter) {
739				Ok(Some(xcm)) => {
740					batch.push(xcm);
741					if batch.len() >= batch_size {
742						return Ok(batch);
743					}
744				},
745				Ok(None) => return Ok(batch),
746				Err(_) => return Err(batch),
747			}
748		}
749	}
750
751	/// The worst-case weight of `on_idle`.
752	pub fn on_idle_weight() -> Weight {
753		<T as crate::Config>::WeightInfo::on_idle_good_msg()
754			.max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
755	}
756
757	#[cfg(feature = "bridging")]
758	fn is_inbound_channel_suspended(sender: ParaId) -> bool {
759		<InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
760	}
761
762	#[cfg(feature = "bridging")]
763	/// Returns tuple of `OutboundState` and number of queued pages.
764	fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
765		<OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
766			let queued_pages = c.last_index.saturating_sub(c.first_index);
767			(c.state, queued_pages)
768		})
769	}
770}
771
772impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
773	// Suspends/Resumes the queue when certain thresholds are reached.
774	fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
775		let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
776
777		let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
778		let suspended = suspended_channels.contains(&para);
779
780		if suspended && fp.ready_pages <= resume_threshold {
781			if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
782				log::error!("defensive: Could not send resumption signal to inbound channel of sibling {:?}: {:?}; channel remains suspended.", para, err);
783			} else {
784				suspended_channels.remove(&para);
785				<InboundXcmpSuspended<T>>::put(suspended_channels);
786			}
787		} else if !suspended && fp.ready_pages >= suspend_threshold {
788			log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
789
790			if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
791				// It will retry if `drop_threshold` is not reached, but it could be too late.
792				log::error!(
793					"defensive: Could not send suspension signal; future messages may be dropped: {:?}", err
794				);
795			} else if let Err(err) = suspended_channels.try_insert(para) {
796				log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
797			} else {
798				<InboundXcmpSuspended<T>>::put(suspended_channels);
799			}
800		}
801	}
802}
803
804impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
805	fn is_paused(para: &ParaId) -> bool {
806		if !QueueSuspended::<T>::get() {
807			return false
808		}
809
810		// Make an exception for the superuser queue:
811		let sender_origin = T::ControllerOriginConverter::convert_origin(
812			(Parent, Parachain((*para).into())),
813			OriginKind::Superuser,
814		);
815		let is_controller =
816			sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
817
818		!is_controller
819	}
820}
821
822impl<T: Config> XcmpMessageHandler for Pallet<T> {
823	fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
824		iter: I,
825		max_weight: Weight,
826	) -> Weight {
827		let mut meter = WeightMeter::with_limit(max_weight);
828
829		let mut known_xcm_senders = BTreeSet::new();
830		for (sender, _sent_at, mut data) in iter {
831			let format = match XcmpMessageFormat::decode(&mut data) {
832				Ok(f) => f,
833				Err(_) => {
834					defensive!("Unknown XCMP message format - dropping");
835					continue
836				},
837			};
838
839			match format {
840				XcmpMessageFormat::Signals => {
841					let mut signal_count = 0;
842					while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
843						signal_count += 1;
844						match ChannelSignal::decode(&mut data) {
845							Ok(ChannelSignal::Suspend) => {
846								if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
847									defensive!(
848										"Not enough weight to process suspend signal - dropping"
849									);
850									break
851								}
852								Self::suspend_channel(sender)
853							},
854							Ok(ChannelSignal::Resume) => {
855								if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
856									defensive!(
857										"Not enough weight to process resume signal - dropping"
858									);
859									break
860								}
861								Self::resume_channel(sender)
862							},
863							Err(_) => {
864								defensive!("Undecodable channel signal - dropping");
865								break
866							},
867						}
868					}
869				},
870				XcmpMessageFormat::ConcatenatedVersionedXcm => {
871					if known_xcm_senders.insert(sender) {
872						if meter
873							.try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
874							.is_err()
875						{
876							defensive!(
877								"Out of weight: cannot enqueue XCMP messages; dropping page; \
878                                    Used weight: ",
879								meter.consumed_ratio()
880							);
881							continue;
882						}
883					}
884
885					let mut can_process_next_batch = true;
886					while can_process_next_batch {
887						let batch = match Self::take_first_concatenated_xcms(
888							&mut data,
889							XCM_BATCH_SIZE,
890							&mut meter,
891						) {
892							Ok(batch) => batch,
893							Err(batch) => {
894								can_process_next_batch = false;
895								defensive!(
896									"HRMP inbound decode stream broke; page will be dropped."
897								);
898								batch
899							},
900						};
901						if batch.is_empty() {
902							break;
903						}
904
905						if let Err(()) = Self::enqueue_xcmp_messages(sender, &batch, &mut meter) {
906							break
907						}
908					}
909				},
910				XcmpMessageFormat::ConcatenatedEncodedBlob => {
911					defensive!("Blob messages are unhandled - dropping");
912					continue
913				},
914			}
915		}
916
917		meter.consumed()
918	}
919}
920
921impl<T: Config> XcmpMessageSource for Pallet<T> {
922	fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
923		let mut statuses = <OutboundXcmpStatus<T>>::get();
924		let old_statuses_len = statuses.len();
925		let max_message_count = statuses.len().min(maximum_channels);
926		let mut result = Vec::with_capacity(max_message_count);
927
928		for status in statuses.iter_mut() {
929			let OutboundChannelDetails {
930				recipient: para_id,
931				state: outbound_state,
932				mut signals_exist,
933				mut first_index,
934				mut last_index,
935			} = *status;
936
937			let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
938				ChannelStatus::Closed => {
939					// This means that there is no such channel anymore. Nothing to be done but
940					// swallow the messages and discard the status.
941					for i in first_index..last_index {
942						<OutboundXcmpMessages<T>>::remove(para_id, i);
943					}
944					if signals_exist {
945						<SignalMessages<T>>::remove(para_id);
946					}
947					*status = OutboundChannelDetails::new(para_id);
948					continue
949				},
950				ChannelStatus::Full => continue,
951				ChannelStatus::Ready(n, e) => (n, e),
952			};
953
954			// This is a hard limit from the host config; not even signals can bypass it.
955			if result.len() == max_message_count {
956				// We check this condition in the beginning of the loop so that we don't include
957				// a message where the limit is 0.
958				break
959			}
960
961			let page = if signals_exist {
962				let page = <SignalMessages<T>>::get(para_id);
963				defensive_assert!(!page.is_empty(), "Signals must exist");
964
965				if page.len() < max_size_now {
966					<SignalMessages<T>>::remove(para_id);
967					signals_exist = false;
968					page
969				} else {
970					defensive!("Signals should fit into a single page");
971					continue
972				}
973			} else if outbound_state == OutboundState::Suspended {
974				// Signals are exempt from suspension.
975				continue
976			} else if last_index > first_index {
977				let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
978				if page.len() < max_size_now {
979					<OutboundXcmpMessages<T>>::remove(para_id, first_index);
980					first_index += 1;
981					page
982				} else {
983					continue
984				}
985			} else {
986				continue
987			};
988			if first_index == last_index {
989				first_index = 0;
990				last_index = 0;
991			}
992
993			if page.len() > max_size_ever {
994				// TODO: #274 This means that the channel's max message size has changed since
995				//   the message was sent. We should parse it and split into smaller messages but
996				//   since it's so unlikely then for now we just drop it.
997				defensive!("WARNING: oversize message in queue - dropping");
998			} else {
999				result.push((para_id, page.into_inner()));
1000			}
1001
1002			let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1003				Some(channel_info) => channel_info.max_total_size,
1004				None => {
1005					log::warn!("calling `get_channel_info` with no RelevantMessagingState?!");
1006					MAX_POSSIBLE_ALLOCATION // We use this as a fallback in case the messaging state is not present
1007				},
1008			};
1009			let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1010			let remaining_total_size: usize = (first_index..last_index)
1011				.map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1012				.sum();
1013			if remaining_total_size <= threshold as usize {
1014				Self::decrease_fee_factor(para_id);
1015			}
1016
1017			*status = OutboundChannelDetails {
1018				recipient: para_id,
1019				state: outbound_state,
1020				signals_exist,
1021				first_index,
1022				last_index,
1023			};
1024		}
1025		debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1026
1027		// Sort the outbound messages by ascending recipient para id to satisfy the acceptance
1028		// criteria requirement.
1029		result.sort_by_key(|m| m.0);
1030
1031		// Prune hrmp channels that became empty. Additionally, because it may so happen that we
1032		// only gave attention to some channels in `non_empty_hrmp_channels` it's important to
1033		// change the order. Otherwise, the next `on_finalize` we will again give attention
1034		// only to those channels that happen to be in the beginning, until they are emptied.
1035		// This leads to "starvation" of the channels near to the end.
1036		//
1037		// To mitigate this we shift all processed elements towards the end of the vector using
1038		// `rotate_left`. To get intuition how it works see the examples in its rustdoc.
1039		statuses.retain(|x| {
1040			x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1041		});
1042
1043		// old_status_len must be >= status.len() since we never add anything to status.
1044		let pruned = old_statuses_len - statuses.len();
1045		// removing an item from status implies a message being sent, so the result messages must
1046		// be no less than the pruned channels.
1047		let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1048			"Could not store HRMP channels config. Some HRMP channels may be broken.",
1049		);
1050
1051		<OutboundXcmpStatus<T>>::put(statuses);
1052
1053		result
1054	}
1055}
1056
1057/// Xcm sender for sending to a sibling parachain.
1058impl<T: Config> SendXcm for Pallet<T> {
1059	type Ticket = (ParaId, VersionedXcm<()>);
1060
1061	fn validate(
1062		dest: &mut Option<Location>,
1063		msg: &mut Option<Xcm<()>>,
1064	) -> SendResult<(ParaId, VersionedXcm<()>)> {
1065		let d = dest.take().ok_or(SendError::MissingArgument)?;
1066
1067		match d.unpack() {
1068			// An HRMP message for a sibling parachain.
1069			(1, [Parachain(id)]) => {
1070				let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1071				let id = ParaId::from(*id);
1072				let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1073				let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1074					.map_err(|()| SendError::DestinationUnsupported)?;
1075				versioned_xcm
1076					.check_is_decodable()
1077					.map_err(|()| SendError::ExceedsMaxMessageSize)?;
1078
1079				Ok(((id, versioned_xcm), price))
1080			},
1081			_ => {
1082				// Anything else is unhandled. This includes a message that is not meant for us.
1083				// We need to make sure that dest/msg is not consumed here.
1084				*dest = Some(d);
1085				Err(SendError::NotApplicable)
1086			},
1087		}
1088	}
1089
1090	fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1091		let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1092
1093		match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1094			Ok(_) => {
1095				Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1096				Ok(hash)
1097			},
1098			Err(e) => {
1099				log::error!(target: LOG_TARGET, "Deliver error: {e:?}");
1100				Err(SendError::Transport(e.into()))
1101			},
1102		}
1103	}
1104}
1105
1106impl<T: Config> InspectMessageQueues for Pallet<T> {
1107	fn clear_messages() {
1108		// Best effort.
1109		let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1110		OutboundXcmpStatus::<T>::mutate(|details_vec| {
1111			for details in details_vec {
1112				details.first_index = 0;
1113				details.last_index = 0;
1114			}
1115		});
1116	}
1117
1118	fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1119		use xcm::prelude::*;
1120
1121		OutboundXcmpMessages::<T>::iter()
1122			.map(|(para_id, _, messages)| {
1123				let mut data = &messages[..];
1124				let decoded_format = XcmpMessageFormat::decode(&mut data).unwrap();
1125				if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
1126					panic!("Unexpected format.")
1127				}
1128				let mut decoded_messages = Vec::new();
1129				while !data.is_empty() {
1130					let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1131						MAX_XCM_DECODE_DEPTH,
1132						&mut data,
1133					)
1134					.unwrap();
1135					decoded_messages.push(decoded_message);
1136				}
1137
1138				(
1139					VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1140					decoded_messages,
1141				)
1142			})
1143			.collect()
1144	}
1145}
1146
1147impl<T: Config> FeeTracker for Pallet<T> {
1148	type Id = ParaId;
1149
1150	fn get_fee_factor(id: Self::Id) -> FixedU128 {
1151		<DeliveryFeeFactor<T>>::get(id)
1152	}
1153
1154	fn set_fee_factor(id: Self::Id, val: FixedU128) {
1155		<DeliveryFeeFactor<T>>::set(id, val);
1156	}
1157}