polkadot_runtime_parachains/
hrmp.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	configuration::{self, HostConfiguration},
19	dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23	vec,
24	vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33	SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38	ArithmeticError,
39};
40
41pub use pallet::*;
42
43/// Maximum bound that can be set for inbound channels.
44///
45/// If inaccurate, the weighing of this pallet might become inaccurate. It is expected form the
46/// `configurations` pallet to check these values before setting
47pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48/// Same as [`HRMP_MAX_INBOUND_CHANNELS_BOUND`], but for outbound channels.
49pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58	fn hrmp_init_open_channel() -> Weight;
59	fn hrmp_accept_open_channel() -> Weight;
60	fn hrmp_close_channel() -> Weight;
61	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62	fn force_process_hrmp_open(c: u32) -> Weight;
63	fn force_process_hrmp_close(c: u32) -> Weight;
64	fn hrmp_cancel_open_request(c: u32) -> Weight;
65	fn clean_open_channel_requests(c: u32) -> Weight;
66	fn force_open_hrmp_channel(c: u32) -> Weight;
67	fn establish_system_channel() -> Weight;
68	fn poke_channel_deposits() -> Weight;
69	fn establish_channel_with_system() -> Weight;
70}
71
72/// A weight info that is only suitable for testing.
73pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76	fn hrmp_accept_open_channel() -> Weight {
77		Weight::MAX
78	}
79	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80		Weight::MAX
81	}
82	fn force_process_hrmp_close(_: u32) -> Weight {
83		Weight::MAX
84	}
85	fn force_process_hrmp_open(_: u32) -> Weight {
86		Weight::MAX
87	}
88	fn hrmp_cancel_open_request(_: u32) -> Weight {
89		Weight::MAX
90	}
91	fn hrmp_close_channel() -> Weight {
92		Weight::MAX
93	}
94	fn hrmp_init_open_channel() -> Weight {
95		Weight::MAX
96	}
97	fn clean_open_channel_requests(_: u32) -> Weight {
98		Weight::MAX
99	}
100	fn force_open_hrmp_channel(_: u32) -> Weight {
101		Weight::MAX
102	}
103	fn establish_system_channel() -> Weight {
104		Weight::MAX
105	}
106	fn poke_channel_deposits() -> Weight {
107		Weight::MAX
108	}
109	fn establish_channel_with_system() -> Weight {
110		Weight::MAX
111	}
112}
113
114/// A description of a request to open an HRMP channel.
115#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117	/// Indicates if this request was confirmed by the recipient.
118	pub confirmed: bool,
119	/// NOTE: this field is deprecated. Channel open requests became non-expiring and this value
120	/// became unused.
121	pub _age: SessionIndex,
122	/// The amount that the sender supplied at the time of creation of this request.
123	pub sender_deposit: Balance,
124	/// The maximum message size that could be put into the channel.
125	pub max_message_size: u32,
126	/// The maximum number of messages that can be pending in the channel at once.
127	pub max_capacity: u32,
128	/// The maximum total size of the messages that can be pending in the channel at once.
129	pub max_total_size: u32,
130}
131
132/// A metadata of an HRMP channel.
133#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136	// NOTE: This structure is used by parachains via merkle proofs. Therefore, this struct
137	// requires special treatment.
138	//
139	// A parachain requested this struct can only depend on the subset of this struct.
140	// Specifically, only a first few fields can be depended upon (See `AbridgedHrmpChannel`).
141	// These fields cannot be changed without corresponding migration of parachains.
142	/// The maximum number of messages that can be pending in the channel at once.
143	pub max_capacity: u32,
144	/// The maximum total size of the messages that can be pending in the channel at once.
145	pub max_total_size: u32,
146	/// The maximum message size that could be put into the channel.
147	pub max_message_size: u32,
148	/// The current number of messages pending in the channel.
149	/// Invariant: should be less or equal to `max_capacity`.s`.
150	pub msg_count: u32,
151	/// The total size in bytes of all message payloads in the channel.
152	/// Invariant: should be less or equal to `max_total_size`.
153	pub total_size: u32,
154	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
155	/// `(prev_head, B, H(M))`, where
156	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
157	/// - `B`: is the [relay-chain] block number in which a message was appended
158	/// - `H(M)`: is the hash of the message being appended.
159	/// This value is initialized to a special value that consists of all zeroes which indicates
160	/// that no messages were previously added.
161	pub mqc_head: Option<Hash>,
162	/// The amount that the sender supplied as a deposit when opening this channel.
163	pub sender_deposit: Balance,
164	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
165	pub recipient_deposit: Balance,
166}
167
168/// An error returned by [`Pallet::check_hrmp_watermark`] that indicates an acceptance criteria
169/// check didn't pass.
170pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176/// An error returned by [`Pallet::check_outbound_hrmp`] that indicates an acceptance criteria check
177/// didn't pass.
178pub(crate) enum OutboundHrmpAcceptanceErr {
179	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180	NotSorted { idx: u32 },
181	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184	CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189	BlockNumber: fmt::Debug,
190{
191	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192		use HrmpWatermarkAcceptanceErr::*;
193		match self {
194			AdvancementRule { new_watermark, last_watermark } => write!(
195				fmt,
196				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197				new_watermark, last_watermark,
198			),
199			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200				fmt,
201				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202				new_watermark, relay_chain_parent_number
203			),
204			LandsOnBlockWithNoMessages { new_watermark } => write!(
205				fmt,
206				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
207				new_watermark
208			),
209		}
210	}
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215		use OutboundHrmpAcceptanceErr::*;
216		match self {
217			MoreMessagesThanPermitted { sent, permitted } => write!(
218				fmt,
219				"more HRMP messages than permitted by config ({} > {})",
220				sent, permitted,
221			),
222			NotSorted { idx } => {
223				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224			},
225			NoSuchChannel { idx, channel_id } => write!(
226				fmt,
227				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228				idx, channel_id.sender, channel_id.recipient,
229			),
230			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231				fmt,
232				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233				idx, msg_size, max_size,
234			),
235			TotalSizeExceeded { idx, total_size, limit } => write!(
236				fmt,
237				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
238				idx, total_size, limit,
239			),
240			CapacityExceeded { idx, count, limit } => write!(
241				fmt,
242				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
243				idx, count, limit,
244			),
245		}
246	}
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251	use super::*;
252
253	#[pallet::pallet]
254	#[pallet::without_storage_info]
255	pub struct Pallet<T>(_);
256
257	#[pallet::config]
258	pub trait Config:
259		frame_system::Config + configuration::Config + paras::Config + dmp::Config
260	{
261		/// The outer event type.
262		#[allow(deprecated)]
263		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
264
265		type RuntimeOrigin: From<crate::Origin>
266			+ From<<Self as frame_system::Config>::RuntimeOrigin>
267			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
268
269		/// The origin that can perform "force" actions on channels.
270		type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
271
272		/// An interface for reserving deposits for opening channels.
273		///
274		/// NOTE that this Currency instance will be charged with the amounts defined in the
275		/// `Configuration` pallet. Specifically, that means that the `Balance` of the `Currency`
276		/// implementation should be the same as `Balance` as used in the `Configuration`.
277		type Currency: ReservableCurrency<Self::AccountId>;
278
279		/// The default channel size and capacity to use when opening a channel to a system
280		/// parachain.
281		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
282
283		/// Means of converting an `Xcm` into a `VersionedXcm`. This pallet sends HRMP XCM
284		/// notifications to the channel-related parachains, while the `WrapVersion` implementation
285		/// attempts to wrap them into the most suitable XCM version for the destination parachain.
286		///
287		/// NOTE: For example, `pallet_xcm` provides an accurate implementation (recommended), or
288		/// the default `()` implementation uses the latest XCM version for all parachains.
289		type VersionWrapper: xcm::WrapVersion;
290
291		/// Something that provides the weight of this pallet.
292		type WeightInfo: WeightInfo;
293	}
294
295	#[pallet::event]
296	#[pallet::generate_deposit(pub(super) fn deposit_event)]
297	pub enum Event<T: Config> {
298		/// Open HRMP channel requested.
299		OpenChannelRequested {
300			sender: ParaId,
301			recipient: ParaId,
302			proposed_max_capacity: u32,
303			proposed_max_message_size: u32,
304		},
305		/// An HRMP channel request sent by the receiver was canceled by either party.
306		OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
307		/// Open HRMP channel accepted.
308		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
309		/// HRMP channel closed.
310		ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
311		/// An HRMP channel was opened via Root origin.
312		HrmpChannelForceOpened {
313			sender: ParaId,
314			recipient: ParaId,
315			proposed_max_capacity: u32,
316			proposed_max_message_size: u32,
317		},
318		/// An HRMP channel was opened with a system chain.
319		HrmpSystemChannelOpened {
320			sender: ParaId,
321			recipient: ParaId,
322			proposed_max_capacity: u32,
323			proposed_max_message_size: u32,
324		},
325		/// An HRMP channel's deposits were updated.
326		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
327	}
328
329	#[pallet::error]
330	pub enum Error<T> {
331		/// The sender tried to open a channel to themselves.
332		OpenHrmpChannelToSelf,
333		/// The recipient is not a valid para.
334		OpenHrmpChannelInvalidRecipient,
335		/// The requested capacity is zero.
336		OpenHrmpChannelZeroCapacity,
337		/// The requested capacity exceeds the global limit.
338		OpenHrmpChannelCapacityExceedsLimit,
339		/// The requested maximum message size is 0.
340		OpenHrmpChannelZeroMessageSize,
341		/// The open request requested the message size that exceeds the global limit.
342		OpenHrmpChannelMessageSizeExceedsLimit,
343		/// The channel already exists
344		OpenHrmpChannelAlreadyExists,
345		/// There is already a request to open the same channel.
346		OpenHrmpChannelAlreadyRequested,
347		/// The sender already has the maximum number of allowed outbound channels.
348		OpenHrmpChannelLimitExceeded,
349		/// The channel from the sender to the origin doesn't exist.
350		AcceptHrmpChannelDoesntExist,
351		/// The channel is already confirmed.
352		AcceptHrmpChannelAlreadyConfirmed,
353		/// The recipient already has the maximum number of allowed inbound channels.
354		AcceptHrmpChannelLimitExceeded,
355		/// The origin tries to close a channel where it is neither the sender nor the recipient.
356		CloseHrmpChannelUnauthorized,
357		/// The channel to be closed doesn't exist.
358		CloseHrmpChannelDoesntExist,
359		/// The channel close request is already requested.
360		CloseHrmpChannelAlreadyUnderway,
361		/// Canceling is requested by neither the sender nor recipient of the open channel request.
362		CancelHrmpOpenChannelUnauthorized,
363		/// The open request doesn't exist.
364		OpenHrmpChannelDoesntExist,
365		/// Cannot cancel an HRMP open channel request because it is already confirmed.
366		OpenHrmpChannelAlreadyConfirmed,
367		/// The provided witness data is wrong.
368		WrongWitness,
369		/// The channel between these two chains cannot be authorized.
370		ChannelCreationNotAuthorized,
371	}
372
373	/// The set of pending HRMP open channel requests.
374	///
375	/// The set is accompanied by a list for iteration.
376	///
377	/// Invariant:
378	/// - There are no channels that exists in list but not in the set and vice versa.
379	#[pallet::storage]
380	pub type HrmpOpenChannelRequests<T: Config> =
381		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
382
383	// NOTE: could become bounded, but we don't have a global maximum for this.
384	// `HRMP_MAX_INBOUND_CHANNELS_BOUND` are per parachain, while this storage tracks the
385	// global state.
386	#[pallet::storage]
387	pub type HrmpOpenChannelRequestsList<T: Config> =
388		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
389
390	/// This mapping tracks how many open channel requests are initiated by a given sender para.
391	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has
392	/// `(X, _)` as the number of `HrmpOpenChannelRequestCount` for `X`.
393	#[pallet::storage]
394	pub type HrmpOpenChannelRequestCount<T: Config> =
395		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
396
397	/// This mapping tracks how many open channel requests were accepted by a given recipient para.
398	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
399	/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
400	#[pallet::storage]
401	pub type HrmpAcceptedChannelRequestCount<T: Config> =
402		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
403
404	/// A set of pending HRMP close channel requests that are going to be closed during the session
405	/// change. Used for checking if a given channel is registered for closure.
406	///
407	/// The set is accompanied by a list for iteration.
408	///
409	/// Invariant:
410	/// - There are no channels that exists in list but not in the set and vice versa.
411	#[pallet::storage]
412	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
413
414	#[pallet::storage]
415	pub type HrmpCloseChannelRequestsList<T: Config> =
416		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
417
418	/// The HRMP watermark associated with each para.
419	/// Invariant:
420	/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a
421	///   session.
422	#[pallet::storage]
423	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
424
425	/// HRMP channel data associated with each para.
426	/// Invariant:
427	/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
428	#[pallet::storage]
429	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
430
431	/// Ingress/egress indexes allow to find all the senders and receivers given the opposite side.
432	/// I.e.
433	///
434	/// (a) ingress index allows to find all the senders for a given recipient.
435	/// (b) egress index allows to find all the recipients for a given sender.
436	///
437	/// Invariants:
438	/// - for each ingress index entry for `P` each item `I` in the index should present in
439	///   `HrmpChannels` as `(I, P)`.
440	/// - for each egress index entry for `P` each item `E` in the index should present in
441	///   `HrmpChannels` as `(P, E)`.
442	/// - there should be no other dangling channels in `HrmpChannels`.
443	/// - the vectors are sorted.
444	#[pallet::storage]
445	pub type HrmpIngressChannelsIndex<T: Config> =
446		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
447
448	// NOTE that this field is used by parachains via merkle storage proofs, therefore changing
449	// the format will require migration of parachains.
450	#[pallet::storage]
451	pub type HrmpEgressChannelsIndex<T: Config> =
452		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
453
454	/// Storage for the messages for each channel.
455	/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
456	#[pallet::storage]
457	pub type HrmpChannelContents<T: Config> = StorageMap<
458		_,
459		Twox64Concat,
460		HrmpChannelId,
461		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
462		ValueQuery,
463	>;
464
465	/// Maintains a mapping that can be used to answer the question: What paras sent a message at
466	/// the given block number for a given receiver. Invariants:
467	/// - The inner `Vec<ParaId>` is never empty.
468	/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
469	/// - The outer vector is sorted ascending by block number and cannot store two items with the
470	///   same block number.
471	#[pallet::storage]
472	pub type HrmpChannelDigests<T: Config> =
473		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
474
475	/// Preopen the given HRMP channels.
476	///
477	/// The values in the tuple corresponds to
478	/// `(sender, recipient, max_capacity, max_message_size)`, i.e. similar to `init_open_channel`.
479	/// In fact, the initialization is performed as if the `init_open_channel` and
480	/// `accept_open_channel` were called with the respective parameters and the session change take
481	///  place.
482	///
483	/// As such, each channel initializer should satisfy the same constraints, namely:
484	///
485	/// 1. `max_capacity` and `max_message_size` should be within the limits set by the
486	///    configuration pallet.
487	/// 2. `sender` and `recipient` must be valid paras.
488	#[pallet::genesis_config]
489	#[derive(DefaultNoBound)]
490	pub struct GenesisConfig<T: Config> {
491		#[serde(skip)]
492		_config: core::marker::PhantomData<T>,
493		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
494	}
495
496	#[pallet::genesis_build]
497	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
498		fn build(&self) {
499			initialize_storage::<T>(&self.preopen_hrmp_channels);
500		}
501	}
502
503	#[pallet::call]
504	impl<T: Config> Pallet<T> {
505		/// Initiate opening a channel from a parachain to a given recipient with given channel
506		/// parameters.
507		///
508		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
509		/// - `proposed_max_message_size` - specifies the maximum size of the messages.
510		///
511		/// These numbers are a subject to the relay-chain configuration limits.
512		///
513		/// The channel can be opened only after the recipient confirms it and only on a session
514		/// change.
515		#[pallet::call_index(0)]
516		#[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
517		pub fn hrmp_init_open_channel(
518			origin: OriginFor<T>,
519			recipient: ParaId,
520			proposed_max_capacity: u32,
521			proposed_max_message_size: u32,
522		) -> DispatchResult {
523			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
524			Self::init_open_channel(
525				origin,
526				recipient,
527				proposed_max_capacity,
528				proposed_max_message_size,
529			)?;
530			Self::deposit_event(Event::OpenChannelRequested {
531				sender: origin,
532				recipient,
533				proposed_max_capacity,
534				proposed_max_message_size,
535			});
536			Ok(())
537		}
538
539		/// Accept a pending open channel request from the given sender.
540		///
541		/// The channel will be opened only on the next session boundary.
542		#[pallet::call_index(1)]
543		#[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
544		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
545			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
546			Self::accept_open_channel(origin, sender)?;
547			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
548			Ok(())
549		}
550
551		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
552		/// recipient in the channel being closed.
553		///
554		/// The closure can only happen on a session change.
555		#[pallet::call_index(2)]
556		#[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
557		pub fn hrmp_close_channel(
558			origin: OriginFor<T>,
559			channel_id: HrmpChannelId,
560		) -> DispatchResult {
561			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
562			Self::close_channel(origin, channel_id.clone())?;
563			Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
564			Ok(())
565		}
566
567		/// This extrinsic triggers the cleanup of all the HRMP storage items that a para may have.
568		/// Normally this happens once per session, but this allows you to trigger the cleanup
569		/// immediately for a specific parachain.
570		///
571		/// Number of inbound and outbound channels for `para` must be provided as witness data.
572		///
573		/// Origin must be the `ChannelManager`.
574		#[pallet::call_index(3)]
575		#[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
576		pub fn force_clean_hrmp(
577			origin: OriginFor<T>,
578			para: ParaId,
579			num_inbound: u32,
580			num_outbound: u32,
581		) -> DispatchResult {
582			T::ChannelManager::ensure_origin(origin)?;
583
584			ensure!(
585				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
586					num_inbound as usize,
587				Error::<T>::WrongWitness
588			);
589			ensure!(
590				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
591					num_outbound as usize,
592				Error::<T>::WrongWitness
593			);
594
595			Self::clean_hrmp_after_outgoing(&para);
596			Ok(())
597		}
598
599		/// Force process HRMP open channel requests.
600		///
601		/// If there are pending HRMP open channel requests, you can use this function to process
602		/// all of those requests immediately.
603		///
604		/// Total number of opening channels must be provided as witness data.
605		///
606		/// Origin must be the `ChannelManager`.
607		#[pallet::call_index(4)]
608		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
609		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
610			T::ChannelManager::ensure_origin(origin)?;
611
612			ensure!(
613				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
614					channels,
615				Error::<T>::WrongWitness
616			);
617
618			let host_config = configuration::ActiveConfig::<T>::get();
619			Self::process_hrmp_open_channel_requests(&host_config);
620			Ok(())
621		}
622
623		/// Force process HRMP close channel requests.
624		///
625		/// If there are pending HRMP close channel requests, you can use this function to process
626		/// all of those requests immediately.
627		///
628		/// Total number of closing channels must be provided as witness data.
629		///
630		/// Origin must be the `ChannelManager`.
631		#[pallet::call_index(5)]
632		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
633		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
634			T::ChannelManager::ensure_origin(origin)?;
635
636			ensure!(
637				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
638					channels,
639				Error::<T>::WrongWitness
640			);
641
642			Self::process_hrmp_close_channel_requests();
643			Ok(())
644		}
645
646		/// This cancels a pending open channel request. It can be canceled by either of the sender
647		/// or the recipient for that request. The origin must be either of those.
648		///
649		/// The cancellation happens immediately. It is not possible to cancel the request if it is
650		/// already accepted.
651		///
652		/// Total number of open requests (i.e. `HrmpOpenChannelRequestsList`) must be provided as
653		/// witness data.
654		#[pallet::call_index(6)]
655		#[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
656		pub fn hrmp_cancel_open_request(
657			origin: OriginFor<T>,
658			channel_id: HrmpChannelId,
659			open_requests: u32,
660		) -> DispatchResult {
661			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
662			ensure!(
663				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
664					open_requests,
665				Error::<T>::WrongWitness
666			);
667			Self::cancel_open_request(origin, channel_id.clone())?;
668			Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
669			Ok(())
670		}
671
672		/// Open a channel from a `sender` to a `recipient` `ParaId`. Although opened by governance,
673		/// the `max_capacity` and `max_message_size` are still subject to the Relay Chain's
674		/// configured limits.
675		///
676		/// Expected use is when one (and only one) of the `ParaId`s involved in the channel is
677		/// governed by the system, e.g. a system parachain.
678		///
679		/// Origin must be the `ChannelManager`.
680		#[pallet::call_index(7)]
681		#[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
682		pub fn force_open_hrmp_channel(
683			origin: OriginFor<T>,
684			sender: ParaId,
685			recipient: ParaId,
686			max_capacity: u32,
687			max_message_size: u32,
688		) -> DispatchResultWithPostInfo {
689			T::ChannelManager::ensure_origin(origin)?;
690
691			// Guard against a common footgun where someone makes a channel request to a system
692			// parachain and then makes a proposal to open the channel via governance, which fails
693			// because `init_open_channel` fails if there is an existing request. This check will
694			// clear an existing request such that `init_open_channel` should otherwise succeed.
695			let channel_id = HrmpChannelId { sender, recipient };
696			let cancel_request: u32 =
697				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
698					Self::cancel_open_request(sender, channel_id)?;
699					1
700				} else {
701					0
702				};
703
704			// Now we proceed with normal init/accept, except that we set `no_deposit` to true such
705			// that it will not require deposits from either member.
706			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
707			Self::accept_open_channel(recipient, sender)?;
708			Self::deposit_event(Event::HrmpChannelForceOpened {
709				sender,
710				recipient,
711				proposed_max_capacity: max_capacity,
712				proposed_max_message_size: max_message_size,
713			});
714
715			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
716		}
717
718		/// Establish an HRMP channel between two system chains. If the channel does not already
719		/// exist, the transaction fees will be refunded to the caller. The system does not take
720		/// deposits for channels between system chains, and automatically sets the message number
721		/// and size limits to the maximum allowed by the network's configuration.
722		///
723		/// Arguments:
724		///
725		/// - `sender`: A system chain, `ParaId`.
726		/// - `recipient`: A system chain, `ParaId`.
727		///
728		/// Any signed origin can call this function, but _both_ inputs MUST be system chains. If
729		/// the channel does not exist yet, there is no fee.
730		#[pallet::call_index(8)]
731		#[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
732		pub fn establish_system_channel(
733			origin: OriginFor<T>,
734			sender: ParaId,
735			recipient: ParaId,
736		) -> DispatchResultWithPostInfo {
737			let _caller = ensure_signed(origin)?;
738
739			// both chains must be system
740			ensure!(
741				sender.is_system() && recipient.is_system(),
742				Error::<T>::ChannelCreationNotAuthorized
743			);
744
745			let config = configuration::ActiveConfig::<T>::get();
746			let max_message_size = config.hrmp_channel_max_message_size;
747			let max_capacity = config.hrmp_channel_max_capacity;
748
749			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
750			Self::accept_open_channel(recipient, sender)?;
751
752			Self::deposit_event(Event::HrmpSystemChannelOpened {
753				sender,
754				recipient,
755				proposed_max_capacity: max_capacity,
756				proposed_max_message_size: max_message_size,
757			});
758
759			Ok(Pays::No.into())
760		}
761
762		/// Update the deposits held for an HRMP channel to the latest `Configuration`. Channels
763		/// with system chains do not require a deposit.
764		///
765		/// Arguments:
766		///
767		/// - `sender`: A chain, `ParaId`.
768		/// - `recipient`: A chain, `ParaId`.
769		///
770		/// Any signed origin can call this function.
771		#[pallet::call_index(9)]
772		#[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
773		pub fn poke_channel_deposits(
774			origin: OriginFor<T>,
775			sender: ParaId,
776			recipient: ParaId,
777		) -> DispatchResult {
778			let _caller = ensure_signed(origin)?;
779			let channel_id = HrmpChannelId { sender, recipient };
780			let is_system = sender.is_system() || recipient.is_system();
781
782			let config = configuration::ActiveConfig::<T>::get();
783
784			// Channels with and amongst the system do not require a deposit.
785			let (new_sender_deposit, new_recipient_deposit) = if is_system {
786				(0, 0)
787			} else {
788				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
789			};
790
791			HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
792				if let Some(ref mut channel) = channel {
793					let current_sender_deposit = channel.sender_deposit;
794					let current_recipient_deposit = channel.recipient_deposit;
795
796					// nothing to update
797					if current_sender_deposit == new_sender_deposit &&
798						current_recipient_deposit == new_recipient_deposit
799					{
800						return Ok(())
801					}
802
803					// sender
804					if current_sender_deposit > new_sender_deposit {
805						// Can never underflow, but be paranoid.
806						let amount = current_sender_deposit
807							.checked_sub(new_sender_deposit)
808							.ok_or(ArithmeticError::Underflow)?;
809						T::Currency::unreserve(
810							&channel_id.sender.into_account_truncating(),
811							// The difference should always be convertible into `Balance`, but be
812							// paranoid and do nothing in case.
813							amount.try_into().unwrap_or(Zero::zero()),
814						);
815					} else if current_sender_deposit < new_sender_deposit {
816						let amount = new_sender_deposit
817							.checked_sub(current_sender_deposit)
818							.ok_or(ArithmeticError::Underflow)?;
819						T::Currency::reserve(
820							&channel_id.sender.into_account_truncating(),
821							amount.try_into().unwrap_or(Zero::zero()),
822						)?;
823					}
824
825					// recipient
826					if current_recipient_deposit > new_recipient_deposit {
827						let amount = current_recipient_deposit
828							.checked_sub(new_recipient_deposit)
829							.ok_or(ArithmeticError::Underflow)?;
830						T::Currency::unreserve(
831							&channel_id.recipient.into_account_truncating(),
832							amount.try_into().unwrap_or(Zero::zero()),
833						);
834					} else if current_recipient_deposit < new_recipient_deposit {
835						let amount = new_recipient_deposit
836							.checked_sub(current_recipient_deposit)
837							.ok_or(ArithmeticError::Underflow)?;
838						T::Currency::reserve(
839							&channel_id.recipient.into_account_truncating(),
840							amount.try_into().unwrap_or(Zero::zero()),
841						)?;
842					}
843
844					// update storage
845					channel.sender_deposit = new_sender_deposit;
846					channel.recipient_deposit = new_recipient_deposit;
847				} else {
848					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
849				}
850				Ok(())
851			})?;
852
853			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
854
855			Ok(())
856		}
857
858		/// Establish a bidirectional HRMP channel between a parachain and a system chain.
859		///
860		/// Arguments:
861		///
862		/// - `target_system_chain`: A system chain, `ParaId`.
863		///
864		/// The origin needs to be the parachain origin.
865		#[pallet::call_index(10)]
866		#[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
867		pub fn establish_channel_with_system(
868			origin: OriginFor<T>,
869			target_system_chain: ParaId,
870		) -> DispatchResultWithPostInfo {
871			let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
872
873			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
874
875			let (max_message_size, max_capacity) =
876				T::DefaultChannelSizeAndCapacityWithSystem::get();
877
878			// create bidirectional channel
879			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
880			Self::accept_open_channel(target_system_chain, sender)?;
881
882			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
883			Self::accept_open_channel(sender, target_system_chain)?;
884
885			Self::deposit_event(Event::HrmpSystemChannelOpened {
886				sender,
887				recipient: target_system_chain,
888				proposed_max_capacity: max_capacity,
889				proposed_max_message_size: max_message_size,
890			});
891
892			Self::deposit_event(Event::HrmpSystemChannelOpened {
893				sender: target_system_chain,
894				recipient: sender,
895				proposed_max_capacity: max_capacity,
896				proposed_max_message_size: max_message_size,
897			});
898
899			Ok(Pays::No.into())
900		}
901	}
902}
903
904fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
905	let host_config = configuration::ActiveConfig::<T>::get();
906	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
907		if let Err(err) =
908			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
909		{
910			panic!("failed to initialize the genesis storage: {:?}", err);
911		}
912	}
913	Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
914}
915
916fn preopen_hrmp_channel<T: Config>(
917	sender: ParaId,
918	recipient: ParaId,
919	max_capacity: u32,
920	max_message_size: u32,
921) -> DispatchResult {
922	Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
923	Pallet::<T>::accept_open_channel(recipient, sender)?;
924	Ok(())
925}
926
927/// Routines and getters related to HRMP.
928impl<T: Config> Pallet<T> {
929	/// Block initialization logic, called by initializer.
930	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
931		Weight::zero()
932	}
933
934	/// Block finalization logic, called by initializer.
935	pub(crate) fn initializer_finalize() {}
936
937	/// Called by the initializer to note that a new session has started.
938	pub(crate) fn initializer_on_new_session(
939		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
940		outgoing_paras: &[ParaId],
941	) -> Weight {
942		let w1 = Self::perform_outgoing_para_cleanup(&notification.prev_config, outgoing_paras);
943		Self::process_hrmp_open_channel_requests(&notification.prev_config);
944		Self::process_hrmp_close_channel_requests();
945		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
946			outgoing_paras.len() as u32
947		))
948		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
949			outgoing_paras.len() as u32,
950		))
951	}
952
953	/// Iterate over all paras that were noted for offboarding and remove all the data
954	/// associated with them.
955	fn perform_outgoing_para_cleanup(
956		config: &HostConfiguration<BlockNumberFor<T>>,
957		outgoing: &[ParaId],
958	) -> Weight {
959		let mut w = Self::clean_open_channel_requests(config, outgoing);
960		for outgoing_para in outgoing {
961			Self::clean_hrmp_after_outgoing(outgoing_para);
962
963			// we need a few extra bits of data to weigh this -- all of this is read internally
964			// anyways, so no overhead.
965			let ingress_count =
966				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
967			let egress_count =
968				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
969			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
970				ingress_count,
971				egress_count,
972			));
973		}
974		w
975	}
976
977	// Go over the HRMP open channel requests and remove all in which offboarding paras participate.
978	//
979	// This will also perform the refunds for the counterparty if it doesn't offboard.
980	pub(crate) fn clean_open_channel_requests(
981		config: &HostConfiguration<BlockNumberFor<T>>,
982		outgoing: &[ParaId],
983	) -> Weight {
984		// First collect all the channel ids of the open requests in which there is at least one
985		// party presents in the outgoing list.
986		//
987		// Both the open channel request list and outgoing list are expected to be small enough.
988		// In the most common case there will be only single outgoing para.
989		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
990		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
991			.into_iter()
992			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
993		HrmpOpenChannelRequestsList::<T>::put(stay);
994
995		// Then iterate over all open requests to be removed, pull them out of the set and perform
996		// the refunds if applicable.
997		for req_id in go {
998			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
999				Some(req_data) => req_data,
1000				None => {
1001					// Can't normally happen but no need to panic.
1002					continue
1003				},
1004			};
1005
1006			// Return the deposit of the sender, but only if it is not the para being offboarded.
1007			if !outgoing.contains(&req_id.sender) {
1008				T::Currency::unreserve(
1009					&req_id.sender.into_account_truncating(),
1010					req_data.sender_deposit.unique_saturated_into(),
1011				);
1012			}
1013
1014			// If the request was confirmed, then it means it was confirmed in the finished session.
1015			// Therefore, the config's hrmp_recipient_deposit represents the actual value of the
1016			// deposit.
1017			//
1018			// We still want to refund the deposit only if the para is not being offboarded.
1019			if req_data.confirmed {
1020				if !outgoing.contains(&req_id.recipient) {
1021					T::Currency::unreserve(
1022						&req_id.recipient.into_account_truncating(),
1023						config.hrmp_recipient_deposit.unique_saturated_into(),
1024					);
1025				}
1026				Self::decrease_accepted_channel_request_count(req_id.recipient);
1027			}
1028		}
1029
1030		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1031	}
1032
1033	/// Remove all storage entries associated with the given para.
1034	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1035		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1036		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1037
1038		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1039			.into_iter()
1040			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1041		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1042			.into_iter()
1043			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1044		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1045		to_close.sort();
1046		to_close.dedup();
1047
1048		for channel in to_close {
1049			Self::close_hrmp_channel(&channel);
1050		}
1051	}
1052
1053	/// Iterate over all open channel requests and:
1054	///
1055	/// - prune the stale requests
1056	/// - enact the confirmed requests
1057	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1058		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1059		if open_req_channels.is_empty() {
1060			return
1061		}
1062
1063		// iterate the vector starting from the end making our way to the beginning. This way we
1064		// can leverage `swap_remove` to efficiently remove an item during iteration.
1065		let mut idx = open_req_channels.len();
1066		loop {
1067			// bail if we've iterated over all items.
1068			if idx == 0 {
1069				break
1070			}
1071
1072			idx -= 1;
1073			let channel_id = open_req_channels[idx].clone();
1074			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1075				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
1076			);
1077
1078			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1079			let sender_deposit = request.sender_deposit;
1080			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1081
1082			if request.confirmed {
1083				if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1084					paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1085				{
1086					HrmpChannels::<T>::insert(
1087						&channel_id,
1088						HrmpChannel {
1089							sender_deposit,
1090							recipient_deposit,
1091							max_capacity: request.max_capacity,
1092							max_total_size: request.max_total_size,
1093							max_message_size: request.max_message_size,
1094							msg_count: 0,
1095							total_size: 0,
1096							mqc_head: None,
1097						},
1098					);
1099
1100					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1101						if let Err(i) = v.binary_search(&channel_id.sender) {
1102							v.insert(i, channel_id.sender);
1103						}
1104					});
1105					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1106						if let Err(i) = v.binary_search(&channel_id.recipient) {
1107							v.insert(i, channel_id.recipient);
1108						}
1109					});
1110				}
1111
1112				Self::decrease_open_channel_request_count(channel_id.sender);
1113				Self::decrease_accepted_channel_request_count(channel_id.recipient);
1114
1115				let _ = open_req_channels.swap_remove(idx);
1116				HrmpOpenChannelRequests::<T>::remove(&channel_id);
1117			}
1118		}
1119
1120		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1121	}
1122
1123	/// Iterate over all close channel requests unconditionally closing the channels.
1124	fn process_hrmp_close_channel_requests() {
1125		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1126		for condemned_ch_id in close_reqs {
1127			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1128			Self::close_hrmp_channel(&condemned_ch_id);
1129		}
1130	}
1131
1132	/// Close and remove the designated HRMP channel.
1133	///
1134	/// This includes returning the deposits.
1135	///
1136	/// This function is idempotent, meaning that after the first application it should have no
1137	/// effect (i.e. it won't return the deposits twice).
1138	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1139		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1140			HrmpChannels::<T>::take(channel_id)
1141		{
1142			T::Currency::unreserve(
1143				&channel_id.sender.into_account_truncating(),
1144				sender_deposit.unique_saturated_into(),
1145			);
1146			T::Currency::unreserve(
1147				&channel_id.recipient.into_account_truncating(),
1148				recipient_deposit.unique_saturated_into(),
1149			);
1150		}
1151
1152		HrmpChannelContents::<T>::remove(channel_id);
1153
1154		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1155			if let Ok(i) = v.binary_search(&channel_id.recipient) {
1156				v.remove(i);
1157			}
1158		});
1159		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1160			if let Ok(i) = v.binary_search(&channel_id.sender) {
1161				v.remove(i);
1162			}
1163		});
1164	}
1165
1166	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
1167	pub(crate) fn check_hrmp_watermark(
1168		recipient: ParaId,
1169		relay_chain_parent_number: BlockNumberFor<T>,
1170		new_hrmp_watermark: BlockNumberFor<T>,
1171	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1172		// First, check where the watermark CANNOT legally land.
1173		//
1174		// (a) For ensuring that messages are eventually processed, we require each parablock's
1175		//     watermark to be greater than the last one. The exception to this is if the previous
1176		//     watermark was already equal to the current relay-parent number.
1177		//
1178		// (b) However, a parachain cannot read into "the future", therefore the watermark should
1179		//     not be greater than the relay-chain context block which the parablock refers to.
1180		if new_hrmp_watermark == relay_chain_parent_number {
1181			return Ok(())
1182		}
1183
1184		if new_hrmp_watermark > relay_chain_parent_number {
1185			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1186				new_watermark: new_hrmp_watermark,
1187				relay_chain_parent_number,
1188			})
1189		}
1190
1191		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1192			if new_hrmp_watermark <= last_watermark {
1193				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1194					new_watermark: new_hrmp_watermark,
1195					last_watermark,
1196				})
1197			}
1198		}
1199
1200		// Second, check where the watermark CAN land. It's one of the following:
1201		//
1202		// (a) The relay parent block number (checked above).
1203		// (b) A relay-chain block in which this para received at least one message (checked here)
1204		let digest = HrmpChannelDigests::<T>::get(&recipient);
1205		if !digest
1206			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1207			.is_ok()
1208		{
1209			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1210				new_watermark: new_hrmp_watermark,
1211			})
1212		}
1213		Ok(())
1214	}
1215
1216	/// Returns HRMP watermarks of previously sent messages to a given para.
1217	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1218		HrmpChannelDigests::<T>::get(&recipient)
1219			.into_iter()
1220			.map(|(block_no, _)| block_no)
1221			.collect()
1222	}
1223
1224	pub(crate) fn check_outbound_hrmp(
1225		config: &HostConfiguration<BlockNumberFor<T>>,
1226		sender: ParaId,
1227		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1228	) -> Result<(), OutboundHrmpAcceptanceErr> {
1229		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1230			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1231				sent: out_hrmp_msgs.len() as u32,
1232				permitted: config.hrmp_max_message_num_per_candidate,
1233			})
1234		}
1235
1236		let mut last_recipient = None::<ParaId>;
1237
1238		for (idx, out_msg) in
1239			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1240		{
1241			match last_recipient {
1242				// the messages must be sorted in ascending order and there must be no two messages
1243				// sent to the same recipient. Thus we can check that every recipient is strictly
1244				// greater than the previous one.
1245				Some(last_recipient) if out_msg.recipient <= last_recipient =>
1246					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
1247				_ => last_recipient = Some(out_msg.recipient),
1248			}
1249
1250			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1251
1252			let channel = match HrmpChannels::<T>::get(&channel_id) {
1253				Some(channel) => channel,
1254				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1255			};
1256
1257			let msg_size = out_msg.data.len() as u32;
1258			if msg_size > channel.max_message_size {
1259				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1260					idx,
1261					msg_size,
1262					max_size: channel.max_message_size,
1263				})
1264			}
1265
1266			let new_total_size = channel.total_size + out_msg.data.len() as u32;
1267			if new_total_size > channel.max_total_size {
1268				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1269					idx,
1270					total_size: new_total_size,
1271					limit: channel.max_total_size,
1272				})
1273			}
1274
1275			let new_msg_count = channel.msg_count + 1;
1276			if new_msg_count > channel.max_capacity {
1277				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1278					idx,
1279					count: new_msg_count,
1280					limit: channel.max_capacity,
1281				})
1282			}
1283		}
1284
1285		Ok(())
1286	}
1287
1288	/// Returns remaining outbound channels capacity in messages and in bytes per recipient para.
1289	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1290		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1291		let mut remaining = Vec::with_capacity(recipients.len());
1292
1293		for recipient in recipients {
1294			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1295				continue
1296			};
1297			remaining.push((
1298				recipient,
1299				(
1300					channel.max_capacity - channel.msg_count,
1301					channel.max_total_size - channel.total_size,
1302				),
1303			));
1304		}
1305
1306		remaining
1307	}
1308
1309	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1310		// sift through the incoming messages digest to collect the paras that sent at least one
1311		// message to this parachain between the old and new watermarks.
1312		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1313			let mut senders = BTreeSet::new();
1314			let mut leftover = Vec::with_capacity(digest.len());
1315			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1316				if block_no <= new_hrmp_watermark {
1317					senders.extend(paras_sent_msg);
1318				} else {
1319					leftover.push((block_no, paras_sent_msg));
1320				}
1321			}
1322			*digest = leftover;
1323			senders
1324		});
1325
1326		// having all senders we can trivially find out the channels which we need to prune.
1327		let channels_to_prune =
1328			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1329		for channel_id in channels_to_prune {
1330			// prune each channel up to the new watermark keeping track how many messages we removed
1331			// and what is the total byte size of them.
1332			let (mut pruned_cnt, mut pruned_size) = (0, 0);
1333
1334			let contents = HrmpChannelContents::<T>::get(&channel_id);
1335			let mut leftover = Vec::with_capacity(contents.len());
1336			for msg in contents {
1337				if msg.sent_at <= new_hrmp_watermark {
1338					pruned_cnt += 1;
1339					pruned_size += msg.data.len();
1340				} else {
1341					leftover.push(msg);
1342				}
1343			}
1344			if !leftover.is_empty() {
1345				HrmpChannelContents::<T>::insert(&channel_id, leftover);
1346			} else {
1347				HrmpChannelContents::<T>::remove(&channel_id);
1348			}
1349
1350			// update the channel metadata.
1351			HrmpChannels::<T>::mutate(&channel_id, |channel| {
1352				if let Some(ref mut channel) = channel {
1353					channel.msg_count -= pruned_cnt as u32;
1354					channel.total_size -= pruned_size as u32;
1355				}
1356			});
1357		}
1358
1359		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1360	}
1361
1362	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
1363	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1364		let now = frame_system::Pallet::<T>::block_number();
1365
1366		for out_msg in out_hrmp_msgs {
1367			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1368
1369			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1370				Some(channel) => channel,
1371				None => {
1372					// apparently, that since acceptance of this candidate the recipient was
1373					// offboarded and the channel no longer exists.
1374					continue
1375				},
1376			};
1377
1378			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1379
1380			// book keeping
1381			channel.msg_count += 1;
1382			channel.total_size += inbound.data.len() as u32;
1383
1384			// compute the new MQC head of the channel
1385			let prev_head = channel.mqc_head.unwrap_or(Default::default());
1386			let new_head = BlakeTwo256::hash_of(&(
1387				prev_head,
1388				inbound.sent_at,
1389				T::Hashing::hash_of(&inbound.data),
1390			));
1391			channel.mqc_head = Some(new_head);
1392
1393			HrmpChannels::<T>::insert(&channel_id, channel);
1394			HrmpChannelContents::<T>::append(&channel_id, inbound);
1395
1396			// The digests are sorted in ascending by block number order. There are only two
1397			// possible scenarios here ("the current" is the block of candidate's inclusion):
1398			//
1399			// (a) It's the first time anybody sends a message to this recipient within this block.
1400			//     In this case, the digest vector would be empty or the block number of the latest
1401			//     entry is smaller than the current.
1402			//
1403			// (b) Somebody has already sent a message within the current block. That means that
1404			//     the block number of the latest entry is equal to the current.
1405			//
1406			// Note that having the latest entry greater than the current block number is a logical
1407			// error.
1408			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1409			if let Some(cur_block_digest) = recipient_digest
1410				.last_mut()
1411				.filter(|(block_no, _)| *block_no == now)
1412				.map(|(_, ref mut d)| d)
1413			{
1414				cur_block_digest.push(sender);
1415			} else {
1416				recipient_digest.push((now, vec![sender]));
1417			}
1418			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1419		}
1420	}
1421
1422	/// Initiate opening a channel from a parachain to a given recipient with given channel
1423	/// parameters. If neither chain is part of the system, then a deposit from the `Configuration`
1424	/// will be required for `origin` (the sender) upon opening the request and the `recipient` upon
1425	/// accepting it.
1426	///
1427	/// Basically the same as [`hrmp_init_open_channel`](Pallet::hrmp_init_open_channel) but
1428	/// intended for calling directly from other pallets rather than dispatched.
1429	pub fn init_open_channel(
1430		origin: ParaId,
1431		recipient: ParaId,
1432		proposed_max_capacity: u32,
1433		proposed_max_message_size: u32,
1434	) -> DispatchResult {
1435		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1436		ensure!(
1437			paras::Pallet::<T>::is_valid_para(recipient),
1438			Error::<T>::OpenHrmpChannelInvalidRecipient,
1439		);
1440
1441		let config = configuration::ActiveConfig::<T>::get();
1442		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1443		ensure!(
1444			proposed_max_capacity <= config.hrmp_channel_max_capacity,
1445			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1446		);
1447		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1448		ensure!(
1449			proposed_max_message_size <= config.hrmp_channel_max_message_size,
1450			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1451		);
1452
1453		let channel_id = HrmpChannelId { sender: origin, recipient };
1454		ensure!(
1455			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1456			Error::<T>::OpenHrmpChannelAlreadyRequested,
1457		);
1458		ensure!(
1459			HrmpChannels::<T>::get(&channel_id).is_none(),
1460			Error::<T>::OpenHrmpChannelAlreadyExists,
1461		);
1462
1463		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1464		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1465		let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1466		ensure!(
1467			egress_cnt + open_req_cnt < channel_num_limit,
1468			Error::<T>::OpenHrmpChannelLimitExceeded,
1469		);
1470
1471		// Do not require deposits for channels with or amongst the system.
1472		let is_system = origin.is_system() || recipient.is_system();
1473		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1474		if !deposit.is_zero() {
1475			T::Currency::reserve(
1476				&origin.into_account_truncating(),
1477				deposit.unique_saturated_into(),
1478			)?;
1479		}
1480
1481		// mutating storage directly now -- shall not bail henceforth.
1482
1483		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1484		HrmpOpenChannelRequests::<T>::insert(
1485			&channel_id,
1486			HrmpOpenChannelRequest {
1487				confirmed: false,
1488				_age: 0,
1489				sender_deposit: deposit,
1490				max_capacity: proposed_max_capacity,
1491				max_message_size: proposed_max_message_size,
1492				max_total_size: config.hrmp_channel_max_total_size,
1493			},
1494		);
1495		HrmpOpenChannelRequestsList::<T>::append(channel_id);
1496
1497		Self::send_to_para(
1498			"init_open_channel",
1499			&config,
1500			recipient,
1501			Self::wrap_notification(|| {
1502				use xcm::opaque::latest::{prelude::*, Xcm};
1503				Xcm(vec![HrmpNewChannelOpenRequest {
1504					sender: origin.into(),
1505					max_capacity: proposed_max_capacity,
1506					max_message_size: proposed_max_message_size,
1507				}])
1508			}),
1509		);
1510
1511		Ok(())
1512	}
1513
1514	/// Accept a pending open channel request from the given sender.
1515	///
1516	/// Basically the same as [`hrmp_accept_open_channel`](Pallet::hrmp_accept_open_channel) but
1517	/// intended for calling directly from other pallets rather than dispatched.
1518	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1519		let channel_id = HrmpChannelId { sender, recipient: origin };
1520		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1521			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1522		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1523
1524		// check if by accepting this open channel request, this parachain would exceed the
1525		// number of inbound channels.
1526		let config = configuration::ActiveConfig::<T>::get();
1527		let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1528		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1529		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1530		ensure!(
1531			ingress_cnt + accepted_cnt < channel_num_limit,
1532			Error::<T>::AcceptHrmpChannelLimitExceeded,
1533		);
1534
1535		// Do not require deposits for channels with or amongst the system.
1536		let is_system = origin.is_system() || sender.is_system();
1537		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1538		if !deposit.is_zero() {
1539			T::Currency::reserve(
1540				&origin.into_account_truncating(),
1541				deposit.unique_saturated_into(),
1542			)?;
1543		}
1544
1545		// persist the updated open channel request and then increment the number of accepted
1546		// channels.
1547		channel_req.confirmed = true;
1548		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1549		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1550
1551		Self::send_to_para(
1552			"accept_open_channel",
1553			&config,
1554			sender,
1555			Self::wrap_notification(|| {
1556				use xcm::opaque::latest::{prelude::*, Xcm};
1557				Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1558			}),
1559		);
1560
1561		Ok(())
1562	}
1563
1564	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1565		// check if the origin is allowed to close the channel.
1566		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1567
1568		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1569			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1570		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1571
1572		// Remove the request by the channel id and sync the accompanying list with the set.
1573		HrmpOpenChannelRequests::<T>::remove(&channel_id);
1574		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1575			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1576				open_req_channels.swap_remove(pos);
1577			}
1578		});
1579
1580		Self::decrease_open_channel_request_count(channel_id.sender);
1581		// Don't decrease `HrmpAcceptedChannelRequestCount` because we don't consider confirmed
1582		// requests here.
1583
1584		// Unreserve the sender's deposit. The recipient could not have left their deposit because
1585		// we ensured that the request is not confirmed.
1586		T::Currency::unreserve(
1587			&channel_id.sender.into_account_truncating(),
1588			open_channel_req.sender_deposit.unique_saturated_into(),
1589		);
1590
1591		Ok(())
1592	}
1593
1594	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1595		// check if the origin is allowed to close the channel.
1596		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1597
1598		// check if the channel requested to close does exist.
1599		ensure!(
1600			HrmpChannels::<T>::get(&channel_id).is_some(),
1601			Error::<T>::CloseHrmpChannelDoesntExist,
1602		);
1603
1604		// check that there is no outstanding close request for this channel
1605		ensure!(
1606			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1607			Error::<T>::CloseHrmpChannelAlreadyUnderway,
1608		);
1609
1610		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1611		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1612
1613		let config = configuration::ActiveConfig::<T>::get();
1614		let opposite_party =
1615			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1616
1617		Self::send_to_para(
1618			"close_channel",
1619			&config,
1620			opposite_party,
1621			Self::wrap_notification(|| {
1622				use xcm::opaque::latest::{prelude::*, Xcm};
1623				Xcm(vec![HrmpChannelClosing {
1624					initiator: origin.into(),
1625					sender: channel_id.sender.into(),
1626					recipient: channel_id.recipient.into(),
1627				}])
1628			}),
1629		);
1630
1631		Ok(())
1632	}
1633
1634	/// Returns the list of MQC heads for the inbound channels of the given recipient para paired
1635	/// with the sender para ids. This vector is sorted ascending by the para id and doesn't contain
1636	/// multiple entries with the same sender.
1637	#[cfg(test)]
1638	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1639		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1640
1641		// The ingress channels vector is sorted, thus `mqc_heads` is sorted as well.
1642		let mut mqc_heads = Vec::with_capacity(sender_set.len());
1643		for sender in sender_set {
1644			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1645			let mqc_head = channel_metadata
1646				.and_then(|metadata| metadata.mqc_head)
1647				.unwrap_or(Hash::default());
1648			mqc_heads.push((sender, mqc_head));
1649		}
1650
1651		mqc_heads
1652	}
1653
1654	/// Returns contents of all channels addressed to the given recipient. Channels that have no
1655	/// messages in them are also included.
1656	pub(crate) fn inbound_hrmp_channels_contents(
1657		recipient: ParaId,
1658	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1659		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1660
1661		let mut inbound_hrmp_channels_contents = BTreeMap::new();
1662		for sender in sender_set {
1663			let channel_contents =
1664				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1665			inbound_hrmp_channels_contents.insert(sender, channel_contents);
1666		}
1667
1668		inbound_hrmp_channels_contents
1669	}
1670}
1671
1672impl<T: Config> Pallet<T> {
1673	/// Decreases the open channel request count for the given sender. If the value reaches zero
1674	/// it is removed completely.
1675	fn decrease_open_channel_request_count(sender: ParaId) {
1676		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1677			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1678				0 => None,
1679				n => Some(n),
1680			});
1681		});
1682	}
1683
1684	/// Decreases the accepted channel request count for the given sender. If the value reaches
1685	/// zero it is removed completely.
1686	fn decrease_accepted_channel_request_count(recipient: ParaId) {
1687		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1688			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1689				0 => None,
1690				n => Some(n),
1691			});
1692		});
1693	}
1694
1695	#[cfg(any(feature = "runtime-benchmarks", test))]
1696	fn assert_storage_consistency_exhaustive() {
1697		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1698			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1699		}
1700
1701		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1702			for para in paras {
1703				assert!(
1704					crate::paras::Pallet::<T>::is_valid_para(para),
1705					"{}: {:?} para is offboarded",
1706					cause,
1707					para
1708				);
1709			}
1710		};
1711
1712		assert_eq!(
1713			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1714			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1715		);
1716
1717		// verify that the set of keys in `HrmpOpenChannelRequestCount` corresponds to the set
1718		// of _senders_ in `HrmpOpenChannelRequests`.
1719		//
1720		// having ensured that, we can go ahead and go over all counts and verify that they match.
1721		assert_eq!(
1722			HrmpOpenChannelRequestCount::<T>::iter()
1723				.map(|(k, _)| k)
1724				.collect::<BTreeSet<_>>(),
1725			HrmpOpenChannelRequests::<T>::iter()
1726				.map(|(k, _)| k.sender)
1727				.collect::<BTreeSet<_>>(),
1728		);
1729		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1730			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1731				.filter(|(ch, _)| ch.sender == open_channel_initiator)
1732				.count() as u32;
1733			assert_eq!(expected_num, actual_num);
1734		}
1735
1736		// The same as above, but for accepted channel request count. Note that we are interested
1737		// only in confirmed open requests.
1738		assert_eq!(
1739			HrmpAcceptedChannelRequestCount::<T>::iter()
1740				.map(|(k, _)| k)
1741				.collect::<BTreeSet<_>>(),
1742			HrmpOpenChannelRequests::<T>::iter()
1743				.filter(|(_, v)| v.confirmed)
1744				.map(|(k, _)| k.recipient)
1745				.collect::<BTreeSet<_>>(),
1746		);
1747		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1748			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1749				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1750				.count() as u32;
1751			assert_eq!(expected_num, actual_num);
1752		}
1753
1754		assert_eq!(
1755			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1756			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1757		);
1758
1759		// A HRMP watermark can be None for an onboarded parachain. However, an offboarded parachain
1760		// cannot have an HRMP watermark: it should've been cleanup.
1761		assert_contains_only_onboarded(
1762			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1763			"HRMP watermarks should contain only onboarded paras",
1764		);
1765
1766		// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
1767		// have contents.
1768		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1769			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1770
1771			// pedantic check: there should be no empty vectors in storage, those should be modeled
1772			// by a removed kv pair.
1773			assert!(!contents.is_empty());
1774		}
1775
1776		// Senders and recipients must be onboarded. Otherwise, all channels associated with them
1777		// are removed.
1778		assert_contains_only_onboarded(
1779			HrmpChannels::<T>::iter()
1780				.flat_map(|(k, _)| vec![k.sender, k.recipient])
1781				.collect::<Vec<_>>(),
1782			"senders and recipients in all channels should be onboarded",
1783		);
1784
1785		// Check the docs for `HrmpIngressChannelsIndex` and `HrmpEgressChannelsIndex` in decl
1786		// storage to get an index what are the channel mappings indexes.
1787		//
1788		// Here, from indexes.
1789		//
1790		// ingress         egress
1791		//
1792		// a -> [x, y]     x -> [a, b]
1793		// b -> [x, z]     y -> [a]
1794		//                 z -> [b]
1795		//
1796		// we derive a list of channels they represent.
1797		//
1798		//   (a, x)         (a, x)
1799		//   (a, y)         (a, y)
1800		//   (b, x)         (b, x)
1801		//   (b, z)         (b, z)
1802		//
1803		// and then that we compare that to the channel list in the `HrmpChannels`.
1804		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1805			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1806			.collect::<BTreeSet<_>>();
1807		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1808			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1809			.collect::<BTreeSet<_>>();
1810		let channel_set_ground_truth = HrmpChannels::<T>::iter()
1811			.map(|(k, _)| (k.sender, k.recipient))
1812			.collect::<BTreeSet<_>>();
1813		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1814		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1815
1816		HrmpIngressChannelsIndex::<T>::iter()
1817			.map(|(_, v)| v)
1818			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1819		HrmpEgressChannelsIndex::<T>::iter()
1820			.map(|(_, v)| v)
1821			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1822
1823		assert_contains_only_onboarded(
1824			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1825			"HRMP channel digests should contain only onboarded paras",
1826		);
1827		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1828			// Assert that items are in **strictly** ascending order. The strictness also implies
1829			// there are no duplicates.
1830			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1831
1832			for (_, mut senders) in digest {
1833				assert!(!senders.is_empty());
1834
1835				// check for duplicates. For that we sort the vector, then perform deduplication.
1836				// if the vector stayed the same, there are no duplicates.
1837				senders.sort();
1838				let orig_senders = senders.clone();
1839				senders.dedup();
1840				assert_eq!(
1841					orig_senders, senders,
1842					"duplicates removed implies existence of duplicates"
1843				);
1844			}
1845		}
1846	}
1847}
1848
1849impl<T: Config> Pallet<T> {
1850	/// Wraps HRMP XCM notifications to the most suitable XCM version for the destination para.
1851	/// If the XCM version is unknown, the latest XCM version is used as a best effort.
1852	fn wrap_notification(
1853		mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1854	) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1855		use xcm::{
1856			opaque::VersionedXcm,
1857			prelude::{Junction, Location},
1858			WrapVersion,
1859		};
1860
1861		// Return a closure that can prepare notifications.
1862		move |dest| {
1863			// Attempt to wrap the notification for the destination parachain.
1864			T::VersionWrapper::wrap_version(
1865				&Location::new(0, [Junction::Parachain(dest.into())]),
1866				notification(),
1867			)
1868			.unwrap_or_else(|_| {
1869				// As a best effort, if we cannot resolve the version, fallback to using the latest
1870				// version.
1871				VersionedXcm::from(notification())
1872			})
1873			.encode()
1874		}
1875	}
1876
1877	/// Sends/enqueues notification to the destination parachain.
1878	fn send_to_para(
1879		log_label: &str,
1880		config: &HostConfiguration<BlockNumberFor<T>>,
1881		dest: ParaId,
1882		notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1883	) {
1884		// prepare notification
1885		let notification_bytes = notification_bytes_for(dest);
1886
1887		// try to enqueue
1888		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1889			dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1890		{
1891			// this should never happen unless the max downward message size is configured to a
1892			// jokingly small number.
1893			log::error!(
1894				target: "runtime::hrmp",
1895				"sending '{log_label}::notification_bytes' failed."
1896			);
1897			debug_assert!(false);
1898		}
1899	}
1900}