polkadot_runtime_parachains/
hrmp.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	configuration::{self, HostConfiguration},
19	dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23	vec,
24	vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33	SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38	ArithmeticError,
39};
40
41pub use pallet::*;
42
43/// Maximum bound that can be set for inbound channels.
44///
45/// If inaccurate, the weighing of this pallet might become inaccurate. It is expected form the
46/// `configurations` pallet to check these values before setting
47pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48/// Same as [`HRMP_MAX_INBOUND_CHANNELS_BOUND`], but for outbound channels.
49pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58	fn hrmp_init_open_channel() -> Weight;
59	fn hrmp_accept_open_channel() -> Weight;
60	fn hrmp_close_channel() -> Weight;
61	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62	fn force_process_hrmp_open(c: u32) -> Weight;
63	fn force_process_hrmp_close(c: u32) -> Weight;
64	fn hrmp_cancel_open_request(c: u32) -> Weight;
65	fn clean_open_channel_requests(c: u32) -> Weight;
66	fn force_open_hrmp_channel(c: u32) -> Weight;
67	fn establish_system_channel() -> Weight;
68	fn poke_channel_deposits() -> Weight;
69	fn establish_channel_with_system() -> Weight;
70}
71
72/// A weight info that is only suitable for testing.
73pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76	fn hrmp_accept_open_channel() -> Weight {
77		Weight::MAX
78	}
79	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80		Weight::MAX
81	}
82	fn force_process_hrmp_close(_: u32) -> Weight {
83		Weight::MAX
84	}
85	fn force_process_hrmp_open(_: u32) -> Weight {
86		Weight::MAX
87	}
88	fn hrmp_cancel_open_request(_: u32) -> Weight {
89		Weight::MAX
90	}
91	fn hrmp_close_channel() -> Weight {
92		Weight::MAX
93	}
94	fn hrmp_init_open_channel() -> Weight {
95		Weight::MAX
96	}
97	fn clean_open_channel_requests(_: u32) -> Weight {
98		Weight::MAX
99	}
100	fn force_open_hrmp_channel(_: u32) -> Weight {
101		Weight::MAX
102	}
103	fn establish_system_channel() -> Weight {
104		Weight::MAX
105	}
106	fn poke_channel_deposits() -> Weight {
107		Weight::MAX
108	}
109	fn establish_channel_with_system() -> Weight {
110		Weight::MAX
111	}
112}
113
114/// A description of a request to open an HRMP channel.
115#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117	/// Indicates if this request was confirmed by the recipient.
118	pub confirmed: bool,
119	/// NOTE: this field is deprecated. Channel open requests became non-expiring and this value
120	/// became unused.
121	pub _age: SessionIndex,
122	/// The amount that the sender supplied at the time of creation of this request.
123	pub sender_deposit: Balance,
124	/// The maximum message size that could be put into the channel.
125	pub max_message_size: u32,
126	/// The maximum number of messages that can be pending in the channel at once.
127	pub max_capacity: u32,
128	/// The maximum total size of the messages that can be pending in the channel at once.
129	pub max_total_size: u32,
130}
131
132/// A metadata of an HRMP channel.
133#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136	// NOTE: This structure is used by parachains via merkle proofs. Therefore, this struct
137	// requires special treatment.
138	//
139	// A parachain requested this struct can only depend on the subset of this struct.
140	// Specifically, only a first few fields can be depended upon (See `AbridgedHrmpChannel`).
141	// These fields cannot be changed without corresponding migration of parachains.
142	/// The maximum number of messages that can be pending in the channel at once.
143	pub max_capacity: u32,
144	/// The maximum total size of the messages that can be pending in the channel at once.
145	pub max_total_size: u32,
146	/// The maximum message size that could be put into the channel.
147	pub max_message_size: u32,
148	/// The current number of messages pending in the channel.
149	/// Invariant: should be less or equal to `max_capacity`.s`.
150	pub msg_count: u32,
151	/// The total size in bytes of all message payloads in the channel.
152	/// Invariant: should be less or equal to `max_total_size`.
153	pub total_size: u32,
154	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
155	/// `(prev_head, B, H(M))`, where
156	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
157	/// - `B`: is the [relay-chain] block number in which a message was appended
158	/// - `H(M)`: is the hash of the message being appended.
159	/// This value is initialized to a special value that consists of all zeroes which indicates
160	/// that no messages were previously added.
161	pub mqc_head: Option<Hash>,
162	/// The amount that the sender supplied as a deposit when opening this channel.
163	pub sender_deposit: Balance,
164	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
165	pub recipient_deposit: Balance,
166}
167
168/// An error returned by [`Pallet::check_hrmp_watermark`] that indicates an acceptance criteria
169/// check didn't pass.
170pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176/// An error returned by [`Pallet::check_outbound_hrmp`] that indicates an acceptance criteria check
177/// didn't pass.
178pub(crate) enum OutboundHrmpAcceptanceErr {
179	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180	NotSorted { idx: u32 },
181	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184	CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189	BlockNumber: fmt::Debug,
190{
191	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192		use HrmpWatermarkAcceptanceErr::*;
193		match self {
194			AdvancementRule { new_watermark, last_watermark } => write!(
195				fmt,
196				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197				new_watermark, last_watermark,
198			),
199			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200				fmt,
201				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202				new_watermark, relay_chain_parent_number
203			),
204			LandsOnBlockWithNoMessages { new_watermark } => write!(
205				fmt,
206				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
207				new_watermark
208			),
209		}
210	}
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215		use OutboundHrmpAcceptanceErr::*;
216		match self {
217			MoreMessagesThanPermitted { sent, permitted } => write!(
218				fmt,
219				"more HRMP messages than permitted by config ({} > {})",
220				sent, permitted,
221			),
222			NotSorted { idx } => {
223				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224			},
225			NoSuchChannel { idx, channel_id } => write!(
226				fmt,
227				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228				idx, channel_id.sender, channel_id.recipient,
229			),
230			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231				fmt,
232				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233				idx, msg_size, max_size,
234			),
235			TotalSizeExceeded { idx, total_size, limit } => write!(
236				fmt,
237				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
238				idx, total_size, limit,
239			),
240			CapacityExceeded { idx, count, limit } => write!(
241				fmt,
242				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
243				idx, count, limit,
244			),
245		}
246	}
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251	use super::*;
252
253	#[pallet::pallet]
254	#[pallet::without_storage_info]
255	pub struct Pallet<T>(_);
256
257	#[pallet::config]
258	pub trait Config:
259		frame_system::Config + configuration::Config + paras::Config + dmp::Config
260	{
261		/// The outer event type.
262		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
263
264		type RuntimeOrigin: From<crate::Origin>
265			+ From<<Self as frame_system::Config>::RuntimeOrigin>
266			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
267
268		/// The origin that can perform "force" actions on channels.
269		type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
270
271		/// An interface for reserving deposits for opening channels.
272		///
273		/// NOTE that this Currency instance will be charged with the amounts defined in the
274		/// `Configuration` pallet. Specifically, that means that the `Balance` of the `Currency`
275		/// implementation should be the same as `Balance` as used in the `Configuration`.
276		type Currency: ReservableCurrency<Self::AccountId>;
277
278		/// The default channel size and capacity to use when opening a channel to a system
279		/// parachain.
280		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
281
282		/// Means of converting an `Xcm` into a `VersionedXcm`. This pallet sends HRMP XCM
283		/// notifications to the channel-related parachains, while the `WrapVersion` implementation
284		/// attempts to wrap them into the most suitable XCM version for the destination parachain.
285		///
286		/// NOTE: For example, `pallet_xcm` provides an accurate implementation (recommended), or
287		/// the default `()` implementation uses the latest XCM version for all parachains.
288		type VersionWrapper: xcm::WrapVersion;
289
290		/// Something that provides the weight of this pallet.
291		type WeightInfo: WeightInfo;
292	}
293
294	#[pallet::event]
295	#[pallet::generate_deposit(pub(super) fn deposit_event)]
296	pub enum Event<T: Config> {
297		/// Open HRMP channel requested.
298		OpenChannelRequested {
299			sender: ParaId,
300			recipient: ParaId,
301			proposed_max_capacity: u32,
302			proposed_max_message_size: u32,
303		},
304		/// An HRMP channel request sent by the receiver was canceled by either party.
305		OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
306		/// Open HRMP channel accepted.
307		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
308		/// HRMP channel closed.
309		ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
310		/// An HRMP channel was opened via Root origin.
311		HrmpChannelForceOpened {
312			sender: ParaId,
313			recipient: ParaId,
314			proposed_max_capacity: u32,
315			proposed_max_message_size: u32,
316		},
317		/// An HRMP channel was opened with a system chain.
318		HrmpSystemChannelOpened {
319			sender: ParaId,
320			recipient: ParaId,
321			proposed_max_capacity: u32,
322			proposed_max_message_size: u32,
323		},
324		/// An HRMP channel's deposits were updated.
325		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
326	}
327
328	#[pallet::error]
329	pub enum Error<T> {
330		/// The sender tried to open a channel to themselves.
331		OpenHrmpChannelToSelf,
332		/// The recipient is not a valid para.
333		OpenHrmpChannelInvalidRecipient,
334		/// The requested capacity is zero.
335		OpenHrmpChannelZeroCapacity,
336		/// The requested capacity exceeds the global limit.
337		OpenHrmpChannelCapacityExceedsLimit,
338		/// The requested maximum message size is 0.
339		OpenHrmpChannelZeroMessageSize,
340		/// The open request requested the message size that exceeds the global limit.
341		OpenHrmpChannelMessageSizeExceedsLimit,
342		/// The channel already exists
343		OpenHrmpChannelAlreadyExists,
344		/// There is already a request to open the same channel.
345		OpenHrmpChannelAlreadyRequested,
346		/// The sender already has the maximum number of allowed outbound channels.
347		OpenHrmpChannelLimitExceeded,
348		/// The channel from the sender to the origin doesn't exist.
349		AcceptHrmpChannelDoesntExist,
350		/// The channel is already confirmed.
351		AcceptHrmpChannelAlreadyConfirmed,
352		/// The recipient already has the maximum number of allowed inbound channels.
353		AcceptHrmpChannelLimitExceeded,
354		/// The origin tries to close a channel where it is neither the sender nor the recipient.
355		CloseHrmpChannelUnauthorized,
356		/// The channel to be closed doesn't exist.
357		CloseHrmpChannelDoesntExist,
358		/// The channel close request is already requested.
359		CloseHrmpChannelAlreadyUnderway,
360		/// Canceling is requested by neither the sender nor recipient of the open channel request.
361		CancelHrmpOpenChannelUnauthorized,
362		/// The open request doesn't exist.
363		OpenHrmpChannelDoesntExist,
364		/// Cannot cancel an HRMP open channel request because it is already confirmed.
365		OpenHrmpChannelAlreadyConfirmed,
366		/// The provided witness data is wrong.
367		WrongWitness,
368		/// The channel between these two chains cannot be authorized.
369		ChannelCreationNotAuthorized,
370	}
371
372	/// The set of pending HRMP open channel requests.
373	///
374	/// The set is accompanied by a list for iteration.
375	///
376	/// Invariant:
377	/// - There are no channels that exists in list but not in the set and vice versa.
378	#[pallet::storage]
379	pub type HrmpOpenChannelRequests<T: Config> =
380		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
381
382	// NOTE: could become bounded, but we don't have a global maximum for this.
383	// `HRMP_MAX_INBOUND_CHANNELS_BOUND` are per parachain, while this storage tracks the
384	// global state.
385	#[pallet::storage]
386	pub type HrmpOpenChannelRequestsList<T: Config> =
387		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
388
389	/// This mapping tracks how many open channel requests are initiated by a given sender para.
390	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has
391	/// `(X, _)` as the number of `HrmpOpenChannelRequestCount` for `X`.
392	#[pallet::storage]
393	pub type HrmpOpenChannelRequestCount<T: Config> =
394		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
395
396	/// This mapping tracks how many open channel requests were accepted by a given recipient para.
397	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
398	/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
399	#[pallet::storage]
400	pub type HrmpAcceptedChannelRequestCount<T: Config> =
401		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
402
403	/// A set of pending HRMP close channel requests that are going to be closed during the session
404	/// change. Used for checking if a given channel is registered for closure.
405	///
406	/// The set is accompanied by a list for iteration.
407	///
408	/// Invariant:
409	/// - There are no channels that exists in list but not in the set and vice versa.
410	#[pallet::storage]
411	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
412
413	#[pallet::storage]
414	pub type HrmpCloseChannelRequestsList<T: Config> =
415		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
416
417	/// The HRMP watermark associated with each para.
418	/// Invariant:
419	/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a
420	///   session.
421	#[pallet::storage]
422	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
423
424	/// HRMP channel data associated with each para.
425	/// Invariant:
426	/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
427	#[pallet::storage]
428	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
429
430	/// Ingress/egress indexes allow to find all the senders and receivers given the opposite side.
431	/// I.e.
432	///
433	/// (a) ingress index allows to find all the senders for a given recipient.
434	/// (b) egress index allows to find all the recipients for a given sender.
435	///
436	/// Invariants:
437	/// - for each ingress index entry for `P` each item `I` in the index should present in
438	///   `HrmpChannels` as `(I, P)`.
439	/// - for each egress index entry for `P` each item `E` in the index should present in
440	///   `HrmpChannels` as `(P, E)`.
441	/// - there should be no other dangling channels in `HrmpChannels`.
442	/// - the vectors are sorted.
443	#[pallet::storage]
444	pub type HrmpIngressChannelsIndex<T: Config> =
445		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
446
447	// NOTE that this field is used by parachains via merkle storage proofs, therefore changing
448	// the format will require migration of parachains.
449	#[pallet::storage]
450	pub type HrmpEgressChannelsIndex<T: Config> =
451		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
452
453	/// Storage for the messages for each channel.
454	/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
455	#[pallet::storage]
456	pub type HrmpChannelContents<T: Config> = StorageMap<
457		_,
458		Twox64Concat,
459		HrmpChannelId,
460		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
461		ValueQuery,
462	>;
463
464	/// Maintains a mapping that can be used to answer the question: What paras sent a message at
465	/// the given block number for a given receiver. Invariants:
466	/// - The inner `Vec<ParaId>` is never empty.
467	/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
468	/// - The outer vector is sorted ascending by block number and cannot store two items with the
469	///   same block number.
470	#[pallet::storage]
471	pub type HrmpChannelDigests<T: Config> =
472		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
473
474	/// Preopen the given HRMP channels.
475	///
476	/// The values in the tuple corresponds to
477	/// `(sender, recipient, max_capacity, max_message_size)`, i.e. similar to `init_open_channel`.
478	/// In fact, the initialization is performed as if the `init_open_channel` and
479	/// `accept_open_channel` were called with the respective parameters and the session change take
480	///  place.
481	///
482	/// As such, each channel initializer should satisfy the same constraints, namely:
483	///
484	/// 1. `max_capacity` and `max_message_size` should be within the limits set by the
485	///    configuration pallet.
486	/// 2. `sender` and `recipient` must be valid paras.
487	#[pallet::genesis_config]
488	#[derive(DefaultNoBound)]
489	pub struct GenesisConfig<T: Config> {
490		#[serde(skip)]
491		_config: core::marker::PhantomData<T>,
492		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
493	}
494
495	#[pallet::genesis_build]
496	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
497		fn build(&self) {
498			initialize_storage::<T>(&self.preopen_hrmp_channels);
499		}
500	}
501
502	#[pallet::call]
503	impl<T: Config> Pallet<T> {
504		/// Initiate opening a channel from a parachain to a given recipient with given channel
505		/// parameters.
506		///
507		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
508		/// - `proposed_max_message_size` - specifies the maximum size of the messages.
509		///
510		/// These numbers are a subject to the relay-chain configuration limits.
511		///
512		/// The channel can be opened only after the recipient confirms it and only on a session
513		/// change.
514		#[pallet::call_index(0)]
515		#[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
516		pub fn hrmp_init_open_channel(
517			origin: OriginFor<T>,
518			recipient: ParaId,
519			proposed_max_capacity: u32,
520			proposed_max_message_size: u32,
521		) -> DispatchResult {
522			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
523			Self::init_open_channel(
524				origin,
525				recipient,
526				proposed_max_capacity,
527				proposed_max_message_size,
528			)?;
529			Self::deposit_event(Event::OpenChannelRequested {
530				sender: origin,
531				recipient,
532				proposed_max_capacity,
533				proposed_max_message_size,
534			});
535			Ok(())
536		}
537
538		/// Accept a pending open channel request from the given sender.
539		///
540		/// The channel will be opened only on the next session boundary.
541		#[pallet::call_index(1)]
542		#[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
543		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
544			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
545			Self::accept_open_channel(origin, sender)?;
546			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
547			Ok(())
548		}
549
550		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
551		/// recipient in the channel being closed.
552		///
553		/// The closure can only happen on a session change.
554		#[pallet::call_index(2)]
555		#[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
556		pub fn hrmp_close_channel(
557			origin: OriginFor<T>,
558			channel_id: HrmpChannelId,
559		) -> DispatchResult {
560			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
561			Self::close_channel(origin, channel_id.clone())?;
562			Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
563			Ok(())
564		}
565
566		/// This extrinsic triggers the cleanup of all the HRMP storage items that a para may have.
567		/// Normally this happens once per session, but this allows you to trigger the cleanup
568		/// immediately for a specific parachain.
569		///
570		/// Number of inbound and outbound channels for `para` must be provided as witness data.
571		///
572		/// Origin must be the `ChannelManager`.
573		#[pallet::call_index(3)]
574		#[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
575		pub fn force_clean_hrmp(
576			origin: OriginFor<T>,
577			para: ParaId,
578			num_inbound: u32,
579			num_outbound: u32,
580		) -> DispatchResult {
581			T::ChannelManager::ensure_origin(origin)?;
582
583			ensure!(
584				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
585					num_inbound as usize,
586				Error::<T>::WrongWitness
587			);
588			ensure!(
589				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
590					num_outbound as usize,
591				Error::<T>::WrongWitness
592			);
593
594			Self::clean_hrmp_after_outgoing(&para);
595			Ok(())
596		}
597
598		/// Force process HRMP open channel requests.
599		///
600		/// If there are pending HRMP open channel requests, you can use this function to process
601		/// all of those requests immediately.
602		///
603		/// Total number of opening channels must be provided as witness data.
604		///
605		/// Origin must be the `ChannelManager`.
606		#[pallet::call_index(4)]
607		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
608		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
609			T::ChannelManager::ensure_origin(origin)?;
610
611			ensure!(
612				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
613					channels,
614				Error::<T>::WrongWitness
615			);
616
617			let host_config = configuration::ActiveConfig::<T>::get();
618			Self::process_hrmp_open_channel_requests(&host_config);
619			Ok(())
620		}
621
622		/// Force process HRMP close channel requests.
623		///
624		/// If there are pending HRMP close channel requests, you can use this function to process
625		/// all of those requests immediately.
626		///
627		/// Total number of closing channels must be provided as witness data.
628		///
629		/// Origin must be the `ChannelManager`.
630		#[pallet::call_index(5)]
631		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
632		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
633			T::ChannelManager::ensure_origin(origin)?;
634
635			ensure!(
636				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
637					channels,
638				Error::<T>::WrongWitness
639			);
640
641			Self::process_hrmp_close_channel_requests();
642			Ok(())
643		}
644
645		/// This cancels a pending open channel request. It can be canceled by either of the sender
646		/// or the recipient for that request. The origin must be either of those.
647		///
648		/// The cancellation happens immediately. It is not possible to cancel the request if it is
649		/// already accepted.
650		///
651		/// Total number of open requests (i.e. `HrmpOpenChannelRequestsList`) must be provided as
652		/// witness data.
653		#[pallet::call_index(6)]
654		#[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
655		pub fn hrmp_cancel_open_request(
656			origin: OriginFor<T>,
657			channel_id: HrmpChannelId,
658			open_requests: u32,
659		) -> DispatchResult {
660			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
661			ensure!(
662				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
663					open_requests,
664				Error::<T>::WrongWitness
665			);
666			Self::cancel_open_request(origin, channel_id.clone())?;
667			Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
668			Ok(())
669		}
670
671		/// Open a channel from a `sender` to a `recipient` `ParaId`. Although opened by governance,
672		/// the `max_capacity` and `max_message_size` are still subject to the Relay Chain's
673		/// configured limits.
674		///
675		/// Expected use is when one (and only one) of the `ParaId`s involved in the channel is
676		/// governed by the system, e.g. a system parachain.
677		///
678		/// Origin must be the `ChannelManager`.
679		#[pallet::call_index(7)]
680		#[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
681		pub fn force_open_hrmp_channel(
682			origin: OriginFor<T>,
683			sender: ParaId,
684			recipient: ParaId,
685			max_capacity: u32,
686			max_message_size: u32,
687		) -> DispatchResultWithPostInfo {
688			T::ChannelManager::ensure_origin(origin)?;
689
690			// Guard against a common footgun where someone makes a channel request to a system
691			// parachain and then makes a proposal to open the channel via governance, which fails
692			// because `init_open_channel` fails if there is an existing request. This check will
693			// clear an existing request such that `init_open_channel` should otherwise succeed.
694			let channel_id = HrmpChannelId { sender, recipient };
695			let cancel_request: u32 =
696				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
697					Self::cancel_open_request(sender, channel_id)?;
698					1
699				} else {
700					0
701				};
702
703			// Now we proceed with normal init/accept, except that we set `no_deposit` to true such
704			// that it will not require deposits from either member.
705			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
706			Self::accept_open_channel(recipient, sender)?;
707			Self::deposit_event(Event::HrmpChannelForceOpened {
708				sender,
709				recipient,
710				proposed_max_capacity: max_capacity,
711				proposed_max_message_size: max_message_size,
712			});
713
714			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
715		}
716
717		/// Establish an HRMP channel between two system chains. If the channel does not already
718		/// exist, the transaction fees will be refunded to the caller. The system does not take
719		/// deposits for channels between system chains, and automatically sets the message number
720		/// and size limits to the maximum allowed by the network's configuration.
721		///
722		/// Arguments:
723		///
724		/// - `sender`: A system chain, `ParaId`.
725		/// - `recipient`: A system chain, `ParaId`.
726		///
727		/// Any signed origin can call this function, but _both_ inputs MUST be system chains. If
728		/// the channel does not exist yet, there is no fee.
729		#[pallet::call_index(8)]
730		#[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
731		pub fn establish_system_channel(
732			origin: OriginFor<T>,
733			sender: ParaId,
734			recipient: ParaId,
735		) -> DispatchResultWithPostInfo {
736			let _caller = ensure_signed(origin)?;
737
738			// both chains must be system
739			ensure!(
740				sender.is_system() && recipient.is_system(),
741				Error::<T>::ChannelCreationNotAuthorized
742			);
743
744			let config = configuration::ActiveConfig::<T>::get();
745			let max_message_size = config.hrmp_channel_max_message_size;
746			let max_capacity = config.hrmp_channel_max_capacity;
747
748			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
749			Self::accept_open_channel(recipient, sender)?;
750
751			Self::deposit_event(Event::HrmpSystemChannelOpened {
752				sender,
753				recipient,
754				proposed_max_capacity: max_capacity,
755				proposed_max_message_size: max_message_size,
756			});
757
758			Ok(Pays::No.into())
759		}
760
761		/// Update the deposits held for an HRMP channel to the latest `Configuration`. Channels
762		/// with system chains do not require a deposit.
763		///
764		/// Arguments:
765		///
766		/// - `sender`: A chain, `ParaId`.
767		/// - `recipient`: A chain, `ParaId`.
768		///
769		/// Any signed origin can call this function.
770		#[pallet::call_index(9)]
771		#[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
772		pub fn poke_channel_deposits(
773			origin: OriginFor<T>,
774			sender: ParaId,
775			recipient: ParaId,
776		) -> DispatchResult {
777			let _caller = ensure_signed(origin)?;
778			let channel_id = HrmpChannelId { sender, recipient };
779			let is_system = sender.is_system() || recipient.is_system();
780
781			let config = configuration::ActiveConfig::<T>::get();
782
783			// Channels with and amongst the system do not require a deposit.
784			let (new_sender_deposit, new_recipient_deposit) = if is_system {
785				(0, 0)
786			} else {
787				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
788			};
789
790			let _ = HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
791				if let Some(ref mut channel) = channel {
792					let current_sender_deposit = channel.sender_deposit;
793					let current_recipient_deposit = channel.recipient_deposit;
794
795					// nothing to update
796					if current_sender_deposit == new_sender_deposit &&
797						current_recipient_deposit == new_recipient_deposit
798					{
799						return Ok(())
800					}
801
802					// sender
803					if current_sender_deposit > new_sender_deposit {
804						// Can never underflow, but be paranoid.
805						let amount = current_sender_deposit
806							.checked_sub(new_sender_deposit)
807							.ok_or(ArithmeticError::Underflow)?;
808						T::Currency::unreserve(
809							&channel_id.sender.into_account_truncating(),
810							// The difference should always be convertible into `Balance`, but be
811							// paranoid and do nothing in case.
812							amount.try_into().unwrap_or(Zero::zero()),
813						);
814					} else if current_sender_deposit < new_sender_deposit {
815						let amount = new_sender_deposit
816							.checked_sub(current_sender_deposit)
817							.ok_or(ArithmeticError::Underflow)?;
818						T::Currency::reserve(
819							&channel_id.sender.into_account_truncating(),
820							amount.try_into().unwrap_or(Zero::zero()),
821						)?;
822					}
823
824					// recipient
825					if current_recipient_deposit > new_recipient_deposit {
826						let amount = current_recipient_deposit
827							.checked_sub(new_recipient_deposit)
828							.ok_or(ArithmeticError::Underflow)?;
829						T::Currency::unreserve(
830							&channel_id.recipient.into_account_truncating(),
831							amount.try_into().unwrap_or(Zero::zero()),
832						);
833					} else if current_recipient_deposit < new_recipient_deposit {
834						let amount = new_recipient_deposit
835							.checked_sub(current_recipient_deposit)
836							.ok_or(ArithmeticError::Underflow)?;
837						T::Currency::reserve(
838							&channel_id.recipient.into_account_truncating(),
839							amount.try_into().unwrap_or(Zero::zero()),
840						)?;
841					}
842
843					// update storage
844					channel.sender_deposit = new_sender_deposit;
845					channel.recipient_deposit = new_recipient_deposit;
846				} else {
847					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
848				}
849				Ok(())
850			})?;
851
852			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
853
854			Ok(())
855		}
856
857		/// Establish a bidirectional HRMP channel between a parachain and a system chain.
858		///
859		/// Arguments:
860		///
861		/// - `target_system_chain`: A system chain, `ParaId`.
862		///
863		/// The origin needs to be the parachain origin.
864		#[pallet::call_index(10)]
865		#[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
866		pub fn establish_channel_with_system(
867			origin: OriginFor<T>,
868			target_system_chain: ParaId,
869		) -> DispatchResultWithPostInfo {
870			let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
871
872			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
873
874			let (max_message_size, max_capacity) =
875				T::DefaultChannelSizeAndCapacityWithSystem::get();
876
877			// create bidirectional channel
878			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
879			Self::accept_open_channel(target_system_chain, sender)?;
880
881			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
882			Self::accept_open_channel(sender, target_system_chain)?;
883
884			Self::deposit_event(Event::HrmpSystemChannelOpened {
885				sender,
886				recipient: target_system_chain,
887				proposed_max_capacity: max_capacity,
888				proposed_max_message_size: max_message_size,
889			});
890
891			Self::deposit_event(Event::HrmpSystemChannelOpened {
892				sender: target_system_chain,
893				recipient: sender,
894				proposed_max_capacity: max_capacity,
895				proposed_max_message_size: max_message_size,
896			});
897
898			Ok(Pays::No.into())
899		}
900	}
901}
902
903fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
904	let host_config = configuration::ActiveConfig::<T>::get();
905	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
906		if let Err(err) =
907			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
908		{
909			panic!("failed to initialize the genesis storage: {:?}", err);
910		}
911	}
912	Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
913}
914
915fn preopen_hrmp_channel<T: Config>(
916	sender: ParaId,
917	recipient: ParaId,
918	max_capacity: u32,
919	max_message_size: u32,
920) -> DispatchResult {
921	Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
922	Pallet::<T>::accept_open_channel(recipient, sender)?;
923	Ok(())
924}
925
926/// Routines and getters related to HRMP.
927impl<T: Config> Pallet<T> {
928	/// Block initialization logic, called by initializer.
929	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
930		Weight::zero()
931	}
932
933	/// Block finalization logic, called by initializer.
934	pub(crate) fn initializer_finalize() {}
935
936	/// Called by the initializer to note that a new session has started.
937	pub(crate) fn initializer_on_new_session(
938		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
939		outgoing_paras: &[ParaId],
940	) -> Weight {
941		let w1 = Self::perform_outgoing_para_cleanup(&notification.prev_config, outgoing_paras);
942		Self::process_hrmp_open_channel_requests(&notification.prev_config);
943		Self::process_hrmp_close_channel_requests();
944		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
945			outgoing_paras.len() as u32
946		))
947		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
948			outgoing_paras.len() as u32,
949		))
950	}
951
952	/// Iterate over all paras that were noted for offboarding and remove all the data
953	/// associated with them.
954	fn perform_outgoing_para_cleanup(
955		config: &HostConfiguration<BlockNumberFor<T>>,
956		outgoing: &[ParaId],
957	) -> Weight {
958		let mut w = Self::clean_open_channel_requests(config, outgoing);
959		for outgoing_para in outgoing {
960			Self::clean_hrmp_after_outgoing(outgoing_para);
961
962			// we need a few extra bits of data to weigh this -- all of this is read internally
963			// anyways, so no overhead.
964			let ingress_count =
965				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
966			let egress_count =
967				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
968			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
969				ingress_count,
970				egress_count,
971			));
972		}
973		w
974	}
975
976	// Go over the HRMP open channel requests and remove all in which offboarding paras participate.
977	//
978	// This will also perform the refunds for the counterparty if it doesn't offboard.
979	pub(crate) fn clean_open_channel_requests(
980		config: &HostConfiguration<BlockNumberFor<T>>,
981		outgoing: &[ParaId],
982	) -> Weight {
983		// First collect all the channel ids of the open requests in which there is at least one
984		// party presents in the outgoing list.
985		//
986		// Both the open channel request list and outgoing list are expected to be small enough.
987		// In the most common case there will be only single outgoing para.
988		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
989		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
990			.into_iter()
991			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
992		HrmpOpenChannelRequestsList::<T>::put(stay);
993
994		// Then iterate over all open requests to be removed, pull them out of the set and perform
995		// the refunds if applicable.
996		for req_id in go {
997			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
998				Some(req_data) => req_data,
999				None => {
1000					// Can't normally happen but no need to panic.
1001					continue
1002				},
1003			};
1004
1005			// Return the deposit of the sender, but only if it is not the para being offboarded.
1006			if !outgoing.contains(&req_id.sender) {
1007				T::Currency::unreserve(
1008					&req_id.sender.into_account_truncating(),
1009					req_data.sender_deposit.unique_saturated_into(),
1010				);
1011			}
1012
1013			// If the request was confirmed, then it means it was confirmed in the finished session.
1014			// Therefore, the config's hrmp_recipient_deposit represents the actual value of the
1015			// deposit.
1016			//
1017			// We still want to refund the deposit only if the para is not being offboarded.
1018			if req_data.confirmed {
1019				if !outgoing.contains(&req_id.recipient) {
1020					T::Currency::unreserve(
1021						&req_id.recipient.into_account_truncating(),
1022						config.hrmp_recipient_deposit.unique_saturated_into(),
1023					);
1024				}
1025				Self::decrease_accepted_channel_request_count(req_id.recipient);
1026			}
1027		}
1028
1029		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1030	}
1031
1032	/// Remove all storage entries associated with the given para.
1033	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1034		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1035		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1036
1037		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1038			.into_iter()
1039			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1040		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1041			.into_iter()
1042			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1043		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1044		to_close.sort();
1045		to_close.dedup();
1046
1047		for channel in to_close {
1048			Self::close_hrmp_channel(&channel);
1049		}
1050	}
1051
1052	/// Iterate over all open channel requests and:
1053	///
1054	/// - prune the stale requests
1055	/// - enact the confirmed requests
1056	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1057		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1058		if open_req_channels.is_empty() {
1059			return
1060		}
1061
1062		// iterate the vector starting from the end making our way to the beginning. This way we
1063		// can leverage `swap_remove` to efficiently remove an item during iteration.
1064		let mut idx = open_req_channels.len();
1065		loop {
1066			// bail if we've iterated over all items.
1067			if idx == 0 {
1068				break
1069			}
1070
1071			idx -= 1;
1072			let channel_id = open_req_channels[idx].clone();
1073			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1074				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
1075			);
1076
1077			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1078			let sender_deposit = request.sender_deposit;
1079			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1080
1081			if request.confirmed {
1082				if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1083					paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1084				{
1085					HrmpChannels::<T>::insert(
1086						&channel_id,
1087						HrmpChannel {
1088							sender_deposit,
1089							recipient_deposit,
1090							max_capacity: request.max_capacity,
1091							max_total_size: request.max_total_size,
1092							max_message_size: request.max_message_size,
1093							msg_count: 0,
1094							total_size: 0,
1095							mqc_head: None,
1096						},
1097					);
1098
1099					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1100						if let Err(i) = v.binary_search(&channel_id.sender) {
1101							v.insert(i, channel_id.sender);
1102						}
1103					});
1104					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1105						if let Err(i) = v.binary_search(&channel_id.recipient) {
1106							v.insert(i, channel_id.recipient);
1107						}
1108					});
1109				}
1110
1111				Self::decrease_open_channel_request_count(channel_id.sender);
1112				Self::decrease_accepted_channel_request_count(channel_id.recipient);
1113
1114				let _ = open_req_channels.swap_remove(idx);
1115				HrmpOpenChannelRequests::<T>::remove(&channel_id);
1116			}
1117		}
1118
1119		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1120	}
1121
1122	/// Iterate over all close channel requests unconditionally closing the channels.
1123	fn process_hrmp_close_channel_requests() {
1124		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1125		for condemned_ch_id in close_reqs {
1126			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1127			Self::close_hrmp_channel(&condemned_ch_id);
1128		}
1129	}
1130
1131	/// Close and remove the designated HRMP channel.
1132	///
1133	/// This includes returning the deposits.
1134	///
1135	/// This function is idempotent, meaning that after the first application it should have no
1136	/// effect (i.e. it won't return the deposits twice).
1137	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1138		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1139			HrmpChannels::<T>::take(channel_id)
1140		{
1141			T::Currency::unreserve(
1142				&channel_id.sender.into_account_truncating(),
1143				sender_deposit.unique_saturated_into(),
1144			);
1145			T::Currency::unreserve(
1146				&channel_id.recipient.into_account_truncating(),
1147				recipient_deposit.unique_saturated_into(),
1148			);
1149		}
1150
1151		HrmpChannelContents::<T>::remove(channel_id);
1152
1153		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1154			if let Ok(i) = v.binary_search(&channel_id.recipient) {
1155				v.remove(i);
1156			}
1157		});
1158		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1159			if let Ok(i) = v.binary_search(&channel_id.sender) {
1160				v.remove(i);
1161			}
1162		});
1163	}
1164
1165	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
1166	pub(crate) fn check_hrmp_watermark(
1167		recipient: ParaId,
1168		relay_chain_parent_number: BlockNumberFor<T>,
1169		new_hrmp_watermark: BlockNumberFor<T>,
1170	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1171		// First, check where the watermark CANNOT legally land.
1172		//
1173		// (a) For ensuring that messages are eventually processed, we require each parablock's
1174		//     watermark to be greater than the last one. The exception to this is if the previous
1175		//     watermark was already equal to the current relay-parent number.
1176		//
1177		// (b) However, a parachain cannot read into "the future", therefore the watermark should
1178		//     not be greater than the relay-chain context block which the parablock refers to.
1179		if new_hrmp_watermark == relay_chain_parent_number {
1180			return Ok(())
1181		}
1182
1183		if new_hrmp_watermark > relay_chain_parent_number {
1184			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1185				new_watermark: new_hrmp_watermark,
1186				relay_chain_parent_number,
1187			})
1188		}
1189
1190		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1191			if new_hrmp_watermark <= last_watermark {
1192				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1193					new_watermark: new_hrmp_watermark,
1194					last_watermark,
1195				})
1196			}
1197		}
1198
1199		// Second, check where the watermark CAN land. It's one of the following:
1200		//
1201		// (a) The relay parent block number (checked above).
1202		// (b) A relay-chain block in which this para received at least one message (checked here)
1203		let digest = HrmpChannelDigests::<T>::get(&recipient);
1204		if !digest
1205			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1206			.is_ok()
1207		{
1208			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1209				new_watermark: new_hrmp_watermark,
1210			})
1211		}
1212		Ok(())
1213	}
1214
1215	/// Returns HRMP watermarks of previously sent messages to a given para.
1216	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1217		HrmpChannelDigests::<T>::get(&recipient)
1218			.into_iter()
1219			.map(|(block_no, _)| block_no)
1220			.collect()
1221	}
1222
1223	pub(crate) fn check_outbound_hrmp(
1224		config: &HostConfiguration<BlockNumberFor<T>>,
1225		sender: ParaId,
1226		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1227	) -> Result<(), OutboundHrmpAcceptanceErr> {
1228		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1229			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1230				sent: out_hrmp_msgs.len() as u32,
1231				permitted: config.hrmp_max_message_num_per_candidate,
1232			})
1233		}
1234
1235		let mut last_recipient = None::<ParaId>;
1236
1237		for (idx, out_msg) in
1238			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1239		{
1240			match last_recipient {
1241				// the messages must be sorted in ascending order and there must be no two messages
1242				// sent to the same recipient. Thus we can check that every recipient is strictly
1243				// greater than the previous one.
1244				Some(last_recipient) if out_msg.recipient <= last_recipient =>
1245					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
1246				_ => last_recipient = Some(out_msg.recipient),
1247			}
1248
1249			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1250
1251			let channel = match HrmpChannels::<T>::get(&channel_id) {
1252				Some(channel) => channel,
1253				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1254			};
1255
1256			let msg_size = out_msg.data.len() as u32;
1257			if msg_size > channel.max_message_size {
1258				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1259					idx,
1260					msg_size,
1261					max_size: channel.max_message_size,
1262				})
1263			}
1264
1265			let new_total_size = channel.total_size + out_msg.data.len() as u32;
1266			if new_total_size > channel.max_total_size {
1267				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1268					idx,
1269					total_size: new_total_size,
1270					limit: channel.max_total_size,
1271				})
1272			}
1273
1274			let new_msg_count = channel.msg_count + 1;
1275			if new_msg_count > channel.max_capacity {
1276				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1277					idx,
1278					count: new_msg_count,
1279					limit: channel.max_capacity,
1280				})
1281			}
1282		}
1283
1284		Ok(())
1285	}
1286
1287	/// Returns remaining outbound channels capacity in messages and in bytes per recipient para.
1288	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1289		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1290		let mut remaining = Vec::with_capacity(recipients.len());
1291
1292		for recipient in recipients {
1293			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1294				continue
1295			};
1296			remaining.push((
1297				recipient,
1298				(
1299					channel.max_capacity - channel.msg_count,
1300					channel.max_total_size - channel.total_size,
1301				),
1302			));
1303		}
1304
1305		remaining
1306	}
1307
1308	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1309		// sift through the incoming messages digest to collect the paras that sent at least one
1310		// message to this parachain between the old and new watermarks.
1311		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1312			let mut senders = BTreeSet::new();
1313			let mut leftover = Vec::with_capacity(digest.len());
1314			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1315				if block_no <= new_hrmp_watermark {
1316					senders.extend(paras_sent_msg);
1317				} else {
1318					leftover.push((block_no, paras_sent_msg));
1319				}
1320			}
1321			*digest = leftover;
1322			senders
1323		});
1324
1325		// having all senders we can trivially find out the channels which we need to prune.
1326		let channels_to_prune =
1327			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1328		for channel_id in channels_to_prune {
1329			// prune each channel up to the new watermark keeping track how many messages we removed
1330			// and what is the total byte size of them.
1331			let (mut pruned_cnt, mut pruned_size) = (0, 0);
1332
1333			let contents = HrmpChannelContents::<T>::get(&channel_id);
1334			let mut leftover = Vec::with_capacity(contents.len());
1335			for msg in contents {
1336				if msg.sent_at <= new_hrmp_watermark {
1337					pruned_cnt += 1;
1338					pruned_size += msg.data.len();
1339				} else {
1340					leftover.push(msg);
1341				}
1342			}
1343			if !leftover.is_empty() {
1344				HrmpChannelContents::<T>::insert(&channel_id, leftover);
1345			} else {
1346				HrmpChannelContents::<T>::remove(&channel_id);
1347			}
1348
1349			// update the channel metadata.
1350			HrmpChannels::<T>::mutate(&channel_id, |channel| {
1351				if let Some(ref mut channel) = channel {
1352					channel.msg_count -= pruned_cnt as u32;
1353					channel.total_size -= pruned_size as u32;
1354				}
1355			});
1356		}
1357
1358		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1359	}
1360
1361	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
1362	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1363		let now = frame_system::Pallet::<T>::block_number();
1364
1365		for out_msg in out_hrmp_msgs {
1366			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1367
1368			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1369				Some(channel) => channel,
1370				None => {
1371					// apparently, that since acceptance of this candidate the recipient was
1372					// offboarded and the channel no longer exists.
1373					continue
1374				},
1375			};
1376
1377			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1378
1379			// book keeping
1380			channel.msg_count += 1;
1381			channel.total_size += inbound.data.len() as u32;
1382
1383			// compute the new MQC head of the channel
1384			let prev_head = channel.mqc_head.unwrap_or(Default::default());
1385			let new_head = BlakeTwo256::hash_of(&(
1386				prev_head,
1387				inbound.sent_at,
1388				T::Hashing::hash_of(&inbound.data),
1389			));
1390			channel.mqc_head = Some(new_head);
1391
1392			HrmpChannels::<T>::insert(&channel_id, channel);
1393			HrmpChannelContents::<T>::append(&channel_id, inbound);
1394
1395			// The digests are sorted in ascending by block number order. There are only two
1396			// possible scenarios here ("the current" is the block of candidate's inclusion):
1397			//
1398			// (a) It's the first time anybody sends a message to this recipient within this block.
1399			//     In this case, the digest vector would be empty or the block number of the latest
1400			//     entry is smaller than the current.
1401			//
1402			// (b) Somebody has already sent a message within the current block. That means that
1403			//     the block number of the latest entry is equal to the current.
1404			//
1405			// Note that having the latest entry greater than the current block number is a logical
1406			// error.
1407			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1408			if let Some(cur_block_digest) = recipient_digest
1409				.last_mut()
1410				.filter(|(block_no, _)| *block_no == now)
1411				.map(|(_, ref mut d)| d)
1412			{
1413				cur_block_digest.push(sender);
1414			} else {
1415				recipient_digest.push((now, vec![sender]));
1416			}
1417			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1418		}
1419	}
1420
1421	/// Initiate opening a channel from a parachain to a given recipient with given channel
1422	/// parameters. If neither chain is part of the system, then a deposit from the `Configuration`
1423	/// will be required for `origin` (the sender) upon opening the request and the `recipient` upon
1424	/// accepting it.
1425	///
1426	/// Basically the same as [`hrmp_init_open_channel`](Pallet::hrmp_init_open_channel) but
1427	/// intended for calling directly from other pallets rather than dispatched.
1428	pub fn init_open_channel(
1429		origin: ParaId,
1430		recipient: ParaId,
1431		proposed_max_capacity: u32,
1432		proposed_max_message_size: u32,
1433	) -> DispatchResult {
1434		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1435		ensure!(
1436			paras::Pallet::<T>::is_valid_para(recipient),
1437			Error::<T>::OpenHrmpChannelInvalidRecipient,
1438		);
1439
1440		let config = configuration::ActiveConfig::<T>::get();
1441		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1442		ensure!(
1443			proposed_max_capacity <= config.hrmp_channel_max_capacity,
1444			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1445		);
1446		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1447		ensure!(
1448			proposed_max_message_size <= config.hrmp_channel_max_message_size,
1449			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1450		);
1451
1452		let channel_id = HrmpChannelId { sender: origin, recipient };
1453		ensure!(
1454			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1455			Error::<T>::OpenHrmpChannelAlreadyRequested,
1456		);
1457		ensure!(
1458			HrmpChannels::<T>::get(&channel_id).is_none(),
1459			Error::<T>::OpenHrmpChannelAlreadyExists,
1460		);
1461
1462		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1463		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1464		let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1465		ensure!(
1466			egress_cnt + open_req_cnt < channel_num_limit,
1467			Error::<T>::OpenHrmpChannelLimitExceeded,
1468		);
1469
1470		// Do not require deposits for channels with or amongst the system.
1471		let is_system = origin.is_system() || recipient.is_system();
1472		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1473		if !deposit.is_zero() {
1474			T::Currency::reserve(
1475				&origin.into_account_truncating(),
1476				deposit.unique_saturated_into(),
1477			)?;
1478		}
1479
1480		// mutating storage directly now -- shall not bail henceforth.
1481
1482		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1483		HrmpOpenChannelRequests::<T>::insert(
1484			&channel_id,
1485			HrmpOpenChannelRequest {
1486				confirmed: false,
1487				_age: 0,
1488				sender_deposit: deposit,
1489				max_capacity: proposed_max_capacity,
1490				max_message_size: proposed_max_message_size,
1491				max_total_size: config.hrmp_channel_max_total_size,
1492			},
1493		);
1494		HrmpOpenChannelRequestsList::<T>::append(channel_id);
1495
1496		Self::send_to_para(
1497			"init_open_channel",
1498			&config,
1499			recipient,
1500			Self::wrap_notification(|| {
1501				use xcm::opaque::latest::{prelude::*, Xcm};
1502				Xcm(vec![HrmpNewChannelOpenRequest {
1503					sender: origin.into(),
1504					max_capacity: proposed_max_capacity,
1505					max_message_size: proposed_max_message_size,
1506				}])
1507			}),
1508		);
1509
1510		Ok(())
1511	}
1512
1513	/// Accept a pending open channel request from the given sender.
1514	///
1515	/// Basically the same as [`hrmp_accept_open_channel`](Pallet::hrmp_accept_open_channel) but
1516	/// intended for calling directly from other pallets rather than dispatched.
1517	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1518		let channel_id = HrmpChannelId { sender, recipient: origin };
1519		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1520			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1521		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1522
1523		// check if by accepting this open channel request, this parachain would exceed the
1524		// number of inbound channels.
1525		let config = configuration::ActiveConfig::<T>::get();
1526		let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1527		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1528		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1529		ensure!(
1530			ingress_cnt + accepted_cnt < channel_num_limit,
1531			Error::<T>::AcceptHrmpChannelLimitExceeded,
1532		);
1533
1534		// Do not require deposits for channels with or amongst the system.
1535		let is_system = origin.is_system() || sender.is_system();
1536		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1537		if !deposit.is_zero() {
1538			T::Currency::reserve(
1539				&origin.into_account_truncating(),
1540				deposit.unique_saturated_into(),
1541			)?;
1542		}
1543
1544		// persist the updated open channel request and then increment the number of accepted
1545		// channels.
1546		channel_req.confirmed = true;
1547		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1548		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1549
1550		Self::send_to_para(
1551			"accept_open_channel",
1552			&config,
1553			sender,
1554			Self::wrap_notification(|| {
1555				use xcm::opaque::latest::{prelude::*, Xcm};
1556				Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1557			}),
1558		);
1559
1560		Ok(())
1561	}
1562
1563	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1564		// check if the origin is allowed to close the channel.
1565		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1566
1567		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1568			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1569		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1570
1571		// Remove the request by the channel id and sync the accompanying list with the set.
1572		HrmpOpenChannelRequests::<T>::remove(&channel_id);
1573		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1574			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1575				open_req_channels.swap_remove(pos);
1576			}
1577		});
1578
1579		Self::decrease_open_channel_request_count(channel_id.sender);
1580		// Don't decrease `HrmpAcceptedChannelRequestCount` because we don't consider confirmed
1581		// requests here.
1582
1583		// Unreserve the sender's deposit. The recipient could not have left their deposit because
1584		// we ensured that the request is not confirmed.
1585		T::Currency::unreserve(
1586			&channel_id.sender.into_account_truncating(),
1587			open_channel_req.sender_deposit.unique_saturated_into(),
1588		);
1589
1590		Ok(())
1591	}
1592
1593	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1594		// check if the origin is allowed to close the channel.
1595		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1596
1597		// check if the channel requested to close does exist.
1598		ensure!(
1599			HrmpChannels::<T>::get(&channel_id).is_some(),
1600			Error::<T>::CloseHrmpChannelDoesntExist,
1601		);
1602
1603		// check that there is no outstanding close request for this channel
1604		ensure!(
1605			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1606			Error::<T>::CloseHrmpChannelAlreadyUnderway,
1607		);
1608
1609		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1610		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1611
1612		let config = configuration::ActiveConfig::<T>::get();
1613		let opposite_party =
1614			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1615
1616		Self::send_to_para(
1617			"close_channel",
1618			&config,
1619			opposite_party,
1620			Self::wrap_notification(|| {
1621				use xcm::opaque::latest::{prelude::*, Xcm};
1622				Xcm(vec![HrmpChannelClosing {
1623					initiator: origin.into(),
1624					sender: channel_id.sender.into(),
1625					recipient: channel_id.recipient.into(),
1626				}])
1627			}),
1628		);
1629
1630		Ok(())
1631	}
1632
1633	/// Returns the list of MQC heads for the inbound channels of the given recipient para paired
1634	/// with the sender para ids. This vector is sorted ascending by the para id and doesn't contain
1635	/// multiple entries with the same sender.
1636	#[cfg(test)]
1637	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1638		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1639
1640		// The ingress channels vector is sorted, thus `mqc_heads` is sorted as well.
1641		let mut mqc_heads = Vec::with_capacity(sender_set.len());
1642		for sender in sender_set {
1643			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1644			let mqc_head = channel_metadata
1645				.and_then(|metadata| metadata.mqc_head)
1646				.unwrap_or(Hash::default());
1647			mqc_heads.push((sender, mqc_head));
1648		}
1649
1650		mqc_heads
1651	}
1652
1653	/// Returns contents of all channels addressed to the given recipient. Channels that have no
1654	/// messages in them are also included.
1655	pub(crate) fn inbound_hrmp_channels_contents(
1656		recipient: ParaId,
1657	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1658		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1659
1660		let mut inbound_hrmp_channels_contents = BTreeMap::new();
1661		for sender in sender_set {
1662			let channel_contents =
1663				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1664			inbound_hrmp_channels_contents.insert(sender, channel_contents);
1665		}
1666
1667		inbound_hrmp_channels_contents
1668	}
1669}
1670
1671impl<T: Config> Pallet<T> {
1672	/// Decreases the open channel request count for the given sender. If the value reaches zero
1673	/// it is removed completely.
1674	fn decrease_open_channel_request_count(sender: ParaId) {
1675		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1676			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1677				0 => None,
1678				n => Some(n),
1679			});
1680		});
1681	}
1682
1683	/// Decreases the accepted channel request count for the given sender. If the value reaches
1684	/// zero it is removed completely.
1685	fn decrease_accepted_channel_request_count(recipient: ParaId) {
1686		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1687			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1688				0 => None,
1689				n => Some(n),
1690			});
1691		});
1692	}
1693
1694	#[cfg(any(feature = "runtime-benchmarks", test))]
1695	fn assert_storage_consistency_exhaustive() {
1696		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1697			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1698		}
1699
1700		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1701			for para in paras {
1702				assert!(
1703					crate::paras::Pallet::<T>::is_valid_para(para),
1704					"{}: {:?} para is offboarded",
1705					cause,
1706					para
1707				);
1708			}
1709		};
1710
1711		assert_eq!(
1712			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1713			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1714		);
1715
1716		// verify that the set of keys in `HrmpOpenChannelRequestCount` corresponds to the set
1717		// of _senders_ in `HrmpOpenChannelRequests`.
1718		//
1719		// having ensured that, we can go ahead and go over all counts and verify that they match.
1720		assert_eq!(
1721			HrmpOpenChannelRequestCount::<T>::iter()
1722				.map(|(k, _)| k)
1723				.collect::<BTreeSet<_>>(),
1724			HrmpOpenChannelRequests::<T>::iter()
1725				.map(|(k, _)| k.sender)
1726				.collect::<BTreeSet<_>>(),
1727		);
1728		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1729			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1730				.filter(|(ch, _)| ch.sender == open_channel_initiator)
1731				.count() as u32;
1732			assert_eq!(expected_num, actual_num);
1733		}
1734
1735		// The same as above, but for accepted channel request count. Note that we are interested
1736		// only in confirmed open requests.
1737		assert_eq!(
1738			HrmpAcceptedChannelRequestCount::<T>::iter()
1739				.map(|(k, _)| k)
1740				.collect::<BTreeSet<_>>(),
1741			HrmpOpenChannelRequests::<T>::iter()
1742				.filter(|(_, v)| v.confirmed)
1743				.map(|(k, _)| k.recipient)
1744				.collect::<BTreeSet<_>>(),
1745		);
1746		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1747			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1748				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1749				.count() as u32;
1750			assert_eq!(expected_num, actual_num);
1751		}
1752
1753		assert_eq!(
1754			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1755			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1756		);
1757
1758		// A HRMP watermark can be None for an onboarded parachain. However, an offboarded parachain
1759		// cannot have an HRMP watermark: it should've been cleanup.
1760		assert_contains_only_onboarded(
1761			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1762			"HRMP watermarks should contain only onboarded paras",
1763		);
1764
1765		// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
1766		// have contents.
1767		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1768			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1769
1770			// pedantic check: there should be no empty vectors in storage, those should be modeled
1771			// by a removed kv pair.
1772			assert!(!contents.is_empty());
1773		}
1774
1775		// Senders and recipients must be onboarded. Otherwise, all channels associated with them
1776		// are removed.
1777		assert_contains_only_onboarded(
1778			HrmpChannels::<T>::iter()
1779				.flat_map(|(k, _)| vec![k.sender, k.recipient])
1780				.collect::<Vec<_>>(),
1781			"senders and recipients in all channels should be onboarded",
1782		);
1783
1784		// Check the docs for `HrmpIngressChannelsIndex` and `HrmpEgressChannelsIndex` in decl
1785		// storage to get an index what are the channel mappings indexes.
1786		//
1787		// Here, from indexes.
1788		//
1789		// ingress         egress
1790		//
1791		// a -> [x, y]     x -> [a, b]
1792		// b -> [x, z]     y -> [a]
1793		//                 z -> [b]
1794		//
1795		// we derive a list of channels they represent.
1796		//
1797		//   (a, x)         (a, x)
1798		//   (a, y)         (a, y)
1799		//   (b, x)         (b, x)
1800		//   (b, z)         (b, z)
1801		//
1802		// and then that we compare that to the channel list in the `HrmpChannels`.
1803		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1804			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1805			.collect::<BTreeSet<_>>();
1806		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1807			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1808			.collect::<BTreeSet<_>>();
1809		let channel_set_ground_truth = HrmpChannels::<T>::iter()
1810			.map(|(k, _)| (k.sender, k.recipient))
1811			.collect::<BTreeSet<_>>();
1812		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1813		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1814
1815		HrmpIngressChannelsIndex::<T>::iter()
1816			.map(|(_, v)| v)
1817			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1818		HrmpEgressChannelsIndex::<T>::iter()
1819			.map(|(_, v)| v)
1820			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1821
1822		assert_contains_only_onboarded(
1823			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1824			"HRMP channel digests should contain only onboarded paras",
1825		);
1826		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1827			// Assert that items are in **strictly** ascending order. The strictness also implies
1828			// there are no duplicates.
1829			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1830
1831			for (_, mut senders) in digest {
1832				assert!(!senders.is_empty());
1833
1834				// check for duplicates. For that we sort the vector, then perform deduplication.
1835				// if the vector stayed the same, there are no duplicates.
1836				senders.sort();
1837				let orig_senders = senders.clone();
1838				senders.dedup();
1839				assert_eq!(
1840					orig_senders, senders,
1841					"duplicates removed implies existence of duplicates"
1842				);
1843			}
1844		}
1845	}
1846}
1847
1848impl<T: Config> Pallet<T> {
1849	/// Wraps HRMP XCM notifications to the most suitable XCM version for the destination para.
1850	/// If the XCM version is unknown, the latest XCM version is used as a best effort.
1851	fn wrap_notification(
1852		mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1853	) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1854		use xcm::{
1855			opaque::VersionedXcm,
1856			prelude::{Junction, Location},
1857			WrapVersion,
1858		};
1859
1860		// Return a closure that can prepare notifications.
1861		move |dest| {
1862			// Attempt to wrap the notification for the destination parachain.
1863			T::VersionWrapper::wrap_version(
1864				&Location::new(0, [Junction::Parachain(dest.into())]),
1865				notification(),
1866			)
1867			.unwrap_or_else(|_| {
1868				// As a best effort, if we cannot resolve the version, fallback to using the latest
1869				// version.
1870				VersionedXcm::from(notification())
1871			})
1872			.encode()
1873		}
1874	}
1875
1876	/// Sends/enqueues notification to the destination parachain.
1877	fn send_to_para(
1878		log_label: &str,
1879		config: &HostConfiguration<BlockNumberFor<T>>,
1880		dest: ParaId,
1881		notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1882	) {
1883		// prepare notification
1884		let notification_bytes = notification_bytes_for(dest);
1885
1886		// try to enqueue
1887		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1888			dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1889		{
1890			// this should never happen unless the max downward message size is configured to a
1891			// jokingly small number.
1892			log::error!(
1893				target: "runtime::hrmp",
1894				"sending '{log_label}::notification_bytes' failed."
1895			);
1896			debug_assert!(false);
1897		}
1898	}
1899}