pezkuwi_runtime_teyrchains/
hrmp.rs

1// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
2// This file is part of Pezkuwi.
3
4// Pezkuwi is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Pezkuwi is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Pezkuwi.  If not, see <http://www.gnu.org/licenses/>.
16
17use crate::{
18	configuration::{self, HostConfiguration},
19	dmp, ensure_teyrchain, initializer, paras,
20};
21use alloc::{
22	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23	vec,
24	vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use pezframe_support::{pezpallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use pezframe_system::pezpallet_prelude::*;
30use pezkuwi_primitives::{
31	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
32	SessionIndex,
33};
34use pezkuwi_teyrchain_primitives::primitives::{HorizontalMessages, IsSystem};
35use pezsp_runtime::{
36	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
37	ArithmeticError,
38};
39use scale_info::TypeInfo;
40
41pub use pezpallet::*;
42
43/// Maximum bound that can be set for inbound channels.
44///
45/// If inaccurate, the weighing of this pezpallet might become inaccurate. It is expected form the
46/// `configurations` pezpallet to check these values before setting
47pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48/// Same as [`HRMP_MAX_INBOUND_CHANNELS_BOUND`], but for outbound channels.
49pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58	fn hrmp_init_open_channel() -> Weight;
59	fn hrmp_accept_open_channel() -> Weight;
60	fn hrmp_close_channel() -> Weight;
61	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62	fn force_process_hrmp_open(c: u32) -> Weight;
63	fn force_process_hrmp_close(c: u32) -> Weight;
64	fn hrmp_cancel_open_request(c: u32) -> Weight;
65	fn clean_open_channel_requests(c: u32) -> Weight;
66	fn force_open_hrmp_channel(c: u32) -> Weight;
67	fn establish_system_channel() -> Weight;
68	fn poke_channel_deposits() -> Weight;
69	fn establish_channel_with_system() -> Weight;
70}
71
72/// A weight info that is only suitable for testing.
73pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76	fn hrmp_accept_open_channel() -> Weight {
77		Weight::MAX
78	}
79	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80		Weight::MAX
81	}
82	fn force_process_hrmp_close(_: u32) -> Weight {
83		Weight::MAX
84	}
85	fn force_process_hrmp_open(_: u32) -> Weight {
86		Weight::MAX
87	}
88	fn hrmp_cancel_open_request(_: u32) -> Weight {
89		Weight::MAX
90	}
91	fn hrmp_close_channel() -> Weight {
92		Weight::MAX
93	}
94	fn hrmp_init_open_channel() -> Weight {
95		Weight::MAX
96	}
97	fn clean_open_channel_requests(_: u32) -> Weight {
98		Weight::MAX
99	}
100	fn force_open_hrmp_channel(_: u32) -> Weight {
101		Weight::MAX
102	}
103	fn establish_system_channel() -> Weight {
104		Weight::MAX
105	}
106	fn poke_channel_deposits() -> Weight {
107		Weight::MAX
108	}
109	fn establish_channel_with_system() -> Weight {
110		Weight::MAX
111	}
112}
113
114/// A description of a request to open an HRMP channel.
115#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117	/// Indicates if this request was confirmed by the recipient.
118	pub confirmed: bool,
119	/// NOTE: this field is deprecated. Channel open requests became non-expiring and this value
120	/// became unused.
121	pub _age: SessionIndex,
122	/// The amount that the sender supplied at the time of creation of this request.
123	pub sender_deposit: Balance,
124	/// The maximum message size that could be put into the channel.
125	pub max_message_size: u32,
126	/// The maximum number of messages that can be pending in the channel at once.
127	pub max_capacity: u32,
128	/// The maximum total size of the messages that can be pending in the channel at once.
129	pub max_total_size: u32,
130}
131
132/// A metadata of an HRMP channel.
133#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136	// NOTE: This structure is used by teyrchains via merkle proofs. Therefore, this struct
137	// requires special treatment.
138	//
139	// A teyrchain requested this struct can only depend on the subset of this struct.
140	// Specifically, only a first few fields can be depended upon (See `AbridgedHrmpChannel`).
141	// These fields cannot be changed without corresponding migration of teyrchains.
142	/// The maximum number of messages that can be pending in the channel at once.
143	pub max_capacity: u32,
144	/// The maximum total size of the messages that can be pending in the channel at once.
145	pub max_total_size: u32,
146	/// The maximum message size that could be put into the channel.
147	pub max_message_size: u32,
148	/// The current number of messages pending in the channel.
149	/// Invariant: should be less or equal to `max_capacity`.s`.
150	pub msg_count: u32,
151	/// The total size in bytes of all message payloads in the channel.
152	/// Invariant: should be less or equal to `max_total_size`.
153	pub total_size: u32,
154	/// A head of the Message Queue Chain for this channel. Each link in this chain has a form:
155	/// `(prev_head, B, H(M))`, where
156	/// - `prev_head`: is the previous value of `mqc_head` or zero if none.
157	/// - `B`: is the [relay-chain] block number in which a message was appended
158	/// - `H(M)`: is the hash of the message being appended.
159	/// This value is initialized to a special value that consists of all zeroes which indicates
160	/// that no messages were previously added.
161	pub mqc_head: Option<Hash>,
162	/// The amount that the sender supplied as a deposit when opening this channel.
163	pub sender_deposit: Balance,
164	/// The amount that the recipient supplied as a deposit when accepting opening this channel.
165	pub recipient_deposit: Balance,
166}
167
168/// An error returned by [`Pezpallet::check_hrmp_watermark`] that indicates an acceptance criteria
169/// check didn't pass.
170pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176/// An error returned by [`Pezpallet::check_outbound_hrmp`] that indicates an acceptance criteria
177/// check didn't pass.
178pub(crate) enum OutboundHrmpAcceptanceErr {
179	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180	NotSorted { idx: u32 },
181	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184	CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189	BlockNumber: fmt::Debug,
190{
191	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192		use HrmpWatermarkAcceptanceErr::*;
193		match self {
194			AdvancementRule { new_watermark, last_watermark } => write!(
195				fmt,
196				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197				new_watermark, last_watermark,
198			),
199			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200				fmt,
201				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202				new_watermark, relay_chain_parent_number
203			),
204			LandsOnBlockWithNoMessages { new_watermark } => write!(
205				fmt,
206				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
207				new_watermark
208			),
209		}
210	}
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215		use OutboundHrmpAcceptanceErr::*;
216		match self {
217			MoreMessagesThanPermitted { sent, permitted } => write!(
218				fmt,
219				"more HRMP messages than permitted by config ({} > {})",
220				sent, permitted,
221			),
222			NotSorted { idx } => {
223				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224			},
225			NoSuchChannel { idx, channel_id } => write!(
226				fmt,
227				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228				idx, channel_id.sender, channel_id.recipient,
229			),
230			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231				fmt,
232				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233				idx, msg_size, max_size,
234			),
235			TotalSizeExceeded { idx, total_size, limit } => write!(
236				fmt,
237				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
238				idx, total_size, limit,
239			),
240			CapacityExceeded { idx, count, limit } => write!(
241				fmt,
242				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
243				idx, count, limit,
244			),
245		}
246	}
247}
248
249#[pezframe_support::pezpallet]
250pub mod pezpallet {
251	use super::*;
252
253	#[pezpallet::pezpallet]
254	#[pezpallet::without_storage_info]
255	pub struct Pezpallet<T>(_);
256
257	#[pezpallet::config]
258	pub trait Config:
259		pezframe_system::Config + configuration::Config + paras::Config + dmp::Config
260	{
261		/// The outer event type.
262		#[allow(deprecated)]
263		type RuntimeEvent: From<Event<Self>>
264			+ IsType<<Self as pezframe_system::Config>::RuntimeEvent>;
265
266		type RuntimeOrigin: From<crate::Origin>
267			+ From<<Self as pezframe_system::Config>::RuntimeOrigin>
268			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
269
270		/// The origin that can perform "force" actions on channels.
271		type ChannelManager: EnsureOrigin<<Self as pezframe_system::Config>::RuntimeOrigin>;
272
273		/// An interface for reserving deposits for opening channels.
274		///
275		/// NOTE that this Currency instance will be charged with the amounts defined in the
276		/// `Configuration` pezpallet. Specifically, that means that the `Balance` of the `Currency`
277		/// implementation should be the same as `Balance` as used in the `Configuration`.
278		type Currency: ReservableCurrency<Self::AccountId>;
279
280		/// The default channel size and capacity to use when opening a channel to a system
281		/// teyrchain.
282		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
283
284		/// Means of converting an `Xcm` into a `VersionedXcm`. This pezpallet sends HRMP XCM
285		/// notifications to the channel-related teyrchains, while the `WrapVersion` implementation
286		/// attempts to wrap them into the most suitable XCM version for the destination teyrchain.
287		///
288		/// NOTE: For example, `pezpallet_xcm` provides an accurate implementation (recommended), or
289		/// the default `()` implementation uses the latest XCM version for all teyrchains.
290		type VersionWrapper: xcm::WrapVersion;
291
292		/// Something that provides the weight of this pezpallet.
293		type WeightInfo: WeightInfo;
294	}
295
296	#[pezpallet::event]
297	#[pezpallet::generate_deposit(pub(super) fn deposit_event)]
298	pub enum Event<T: Config> {
299		/// Open HRMP channel requested.
300		OpenChannelRequested {
301			sender: ParaId,
302			recipient: ParaId,
303			proposed_max_capacity: u32,
304			proposed_max_message_size: u32,
305		},
306		/// An HRMP channel request sent by the receiver was canceled by either party.
307		OpenChannelCanceled { by_teyrchain: ParaId, channel_id: HrmpChannelId },
308		/// Open HRMP channel accepted.
309		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
310		/// HRMP channel closed.
311		ChannelClosed { by_teyrchain: ParaId, channel_id: HrmpChannelId },
312		/// An HRMP channel was opened via Root origin.
313		HrmpChannelForceOpened {
314			sender: ParaId,
315			recipient: ParaId,
316			proposed_max_capacity: u32,
317			proposed_max_message_size: u32,
318		},
319		/// An HRMP channel was opened with a system chain.
320		HrmpSystemChannelOpened {
321			sender: ParaId,
322			recipient: ParaId,
323			proposed_max_capacity: u32,
324			proposed_max_message_size: u32,
325		},
326		/// An HRMP channel's deposits were updated.
327		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
328	}
329
330	#[pezpallet::error]
331	pub enum Error<T> {
332		/// The sender tried to open a channel to themselves.
333		OpenHrmpChannelToSelf,
334		/// The recipient is not a valid para.
335		OpenHrmpChannelInvalidRecipient,
336		/// The requested capacity is zero.
337		OpenHrmpChannelZeroCapacity,
338		/// The requested capacity exceeds the global limit.
339		OpenHrmpChannelCapacityExceedsLimit,
340		/// The requested maximum message size is 0.
341		OpenHrmpChannelZeroMessageSize,
342		/// The open request requested the message size that exceeds the global limit.
343		OpenHrmpChannelMessageSizeExceedsLimit,
344		/// The channel already exists
345		OpenHrmpChannelAlreadyExists,
346		/// There is already a request to open the same channel.
347		OpenHrmpChannelAlreadyRequested,
348		/// The sender already has the maximum number of allowed outbound channels.
349		OpenHrmpChannelLimitExceeded,
350		/// The channel from the sender to the origin doesn't exist.
351		AcceptHrmpChannelDoesntExist,
352		/// The channel is already confirmed.
353		AcceptHrmpChannelAlreadyConfirmed,
354		/// The recipient already has the maximum number of allowed inbound channels.
355		AcceptHrmpChannelLimitExceeded,
356		/// The origin tries to close a channel where it is neither the sender nor the recipient.
357		CloseHrmpChannelUnauthorized,
358		/// The channel to be closed doesn't exist.
359		CloseHrmpChannelDoesntExist,
360		/// The channel close request is already requested.
361		CloseHrmpChannelAlreadyUnderway,
362		/// Canceling is requested by neither the sender nor recipient of the open channel request.
363		CancelHrmpOpenChannelUnauthorized,
364		/// The open request doesn't exist.
365		OpenHrmpChannelDoesntExist,
366		/// Cannot cancel an HRMP open channel request because it is already confirmed.
367		OpenHrmpChannelAlreadyConfirmed,
368		/// The provided witness data is wrong.
369		WrongWitness,
370		/// The channel between these two chains cannot be authorized.
371		ChannelCreationNotAuthorized,
372	}
373
374	/// The set of pending HRMP open channel requests.
375	///
376	/// The set is accompanied by a list for iteration.
377	///
378	/// Invariant:
379	/// - There are no channels that exists in list but not in the set and vice versa.
380	#[pezpallet::storage]
381	pub type HrmpOpenChannelRequests<T: Config> =
382		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
383
384	// NOTE: could become bounded, but we don't have a global maximum for this.
385	// `HRMP_MAX_INBOUND_CHANNELS_BOUND` are per teyrchain, while this storage tracks the
386	// global state.
387	#[pezpallet::storage]
388	pub type HrmpOpenChannelRequestsList<T: Config> =
389		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
390
391	/// This mapping tracks how many open channel requests are initiated by a given sender para.
392	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items that has
393	/// `(X, _)` as the number of `HrmpOpenChannelRequestCount` for `X`.
394	#[pezpallet::storage]
395	pub type HrmpOpenChannelRequestCount<T: Config> =
396		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
397
398	/// This mapping tracks how many open channel requests were accepted by a given recipient para.
399	/// Invariant: `HrmpOpenChannelRequests` should contain the same number of items `(_, X)` with
400	/// `confirmed` set to true, as the number of `HrmpAcceptedChannelRequestCount` for `X`.
401	#[pezpallet::storage]
402	pub type HrmpAcceptedChannelRequestCount<T: Config> =
403		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
404
405	/// A set of pending HRMP close channel requests that are going to be closed during the session
406	/// change. Used for checking if a given channel is registered for closure.
407	///
408	/// The set is accompanied by a list for iteration.
409	///
410	/// Invariant:
411	/// - There are no channels that exists in list but not in the set and vice versa.
412	#[pezpallet::storage]
413	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
414
415	#[pezpallet::storage]
416	pub type HrmpCloseChannelRequestsList<T: Config> =
417		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
418
419	/// The HRMP watermark associated with each para.
420	/// Invariant:
421	/// - each para `P` used here as a key should satisfy `Paras::is_valid_para(P)` within a
422	///   session.
423	#[pezpallet::storage]
424	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
425
426	/// HRMP channel data associated with each para.
427	/// Invariant:
428	/// - each participant in the channel should satisfy `Paras::is_valid_para(P)` within a session.
429	#[pezpallet::storage]
430	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
431
432	/// Ingress/egress indexes allow to find all the senders and receivers given the opposite side.
433	/// I.e.
434	///
435	/// (a) ingress index allows to find all the senders for a given recipient.
436	/// (b) egress index allows to find all the recipients for a given sender.
437	///
438	/// Invariants:
439	/// - for each ingress index entry for `P` each item `I` in the index should present in
440	///   `HrmpChannels` as `(I, P)`.
441	/// - for each egress index entry for `P` each item `E` in the index should present in
442	///   `HrmpChannels` as `(P, E)`.
443	/// - there should be no other dangling channels in `HrmpChannels`.
444	/// - the vectors are sorted.
445	#[pezpallet::storage]
446	pub type HrmpIngressChannelsIndex<T: Config> =
447		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
448
449	// NOTE that this field is used by teyrchains via merkle storage proofs, therefore changing
450	// the format will require migration of teyrchains.
451	#[pezpallet::storage]
452	pub type HrmpEgressChannelsIndex<T: Config> =
453		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
454
455	/// Storage for the messages for each channel.
456	/// Invariant: cannot be non-empty if the corresponding channel in `HrmpChannels` is `None`.
457	#[pezpallet::storage]
458	pub type HrmpChannelContents<T: Config> = StorageMap<
459		_,
460		Twox64Concat,
461		HrmpChannelId,
462		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
463		ValueQuery,
464	>;
465
466	/// Maintains a mapping that can be used to answer the question: What paras sent a message at
467	/// the given block number for a given receiver. Invariants:
468	/// - The inner `Vec<ParaId>` is never empty.
469	/// - The inner `Vec<ParaId>` cannot store two same `ParaId`.
470	/// - The outer vector is sorted ascending by block number and cannot store two items with the
471	///   same block number.
472	#[pezpallet::storage]
473	pub type HrmpChannelDigests<T: Config> =
474		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
475
476	/// Preopen the given HRMP channels.
477	///
478	/// The values in the tuple corresponds to
479	/// `(sender, recipient, max_capacity, max_message_size)`, i.e. similar to `init_open_channel`.
480	/// In fact, the initialization is performed as if the `init_open_channel` and
481	/// `accept_open_channel` were called with the respective parameters and the session change take
482	///  place.
483	///
484	/// As such, each channel initializer should satisfy the same constraints, namely:
485	///
486	/// 1. `max_capacity` and `max_message_size` should be within the limits set by the
487	///    configuration pezpallet.
488	/// 2. `sender` and `recipient` must be valid paras.
489	#[pezpallet::genesis_config]
490	#[derive(DefaultNoBound)]
491	pub struct GenesisConfig<T: Config> {
492		#[serde(skip)]
493		_config: core::marker::PhantomData<T>,
494		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
495	}
496
497	#[pezpallet::genesis_build]
498	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
499		fn build(&self) {
500			initialize_storage::<T>(&self.preopen_hrmp_channels);
501		}
502	}
503
504	#[pezpallet::call]
505	impl<T: Config> Pezpallet<T> {
506		/// Initiate opening a channel from a teyrchain to a given recipient with given channel
507		/// parameters.
508		///
509		/// - `proposed_max_capacity` - specifies how many messages can be in the channel at once.
510		/// - `proposed_max_message_size` - specifies the maximum size of the messages.
511		///
512		/// These numbers are a subject to the relay-chain configuration limits.
513		///
514		/// The channel can be opened only after the recipient confirms it and only on a session
515		/// change.
516		#[pezpallet::call_index(0)]
517		#[pezpallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
518		pub fn hrmp_init_open_channel(
519			origin: OriginFor<T>,
520			recipient: ParaId,
521			proposed_max_capacity: u32,
522			proposed_max_message_size: u32,
523		) -> DispatchResult {
524			let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
525			Self::init_open_channel(
526				origin,
527				recipient,
528				proposed_max_capacity,
529				proposed_max_message_size,
530			)?;
531			Self::deposit_event(Event::OpenChannelRequested {
532				sender: origin,
533				recipient,
534				proposed_max_capacity,
535				proposed_max_message_size,
536			});
537			Ok(())
538		}
539
540		/// Accept a pending open channel request from the given sender.
541		///
542		/// The channel will be opened only on the next session boundary.
543		#[pezpallet::call_index(1)]
544		#[pezpallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
545		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
546			let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
547			Self::accept_open_channel(origin, sender)?;
548			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
549			Ok(())
550		}
551
552		/// Initiate unilateral closing of a channel. The origin must be either the sender or the
553		/// recipient in the channel being closed.
554		///
555		/// The closure can only happen on a session change.
556		#[pezpallet::call_index(2)]
557		#[pezpallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
558		pub fn hrmp_close_channel(
559			origin: OriginFor<T>,
560			channel_id: HrmpChannelId,
561		) -> DispatchResult {
562			let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
563			Self::close_channel(origin, channel_id.clone())?;
564			Self::deposit_event(Event::ChannelClosed { by_teyrchain: origin, channel_id });
565			Ok(())
566		}
567
568		/// This extrinsic triggers the cleanup of all the HRMP storage items that a para may have.
569		/// Normally this happens once per session, but this allows you to trigger the cleanup
570		/// immediately for a specific teyrchain.
571		///
572		/// Number of inbound and outbound channels for `para` must be provided as witness data.
573		///
574		/// Origin must be the `ChannelManager`.
575		#[pezpallet::call_index(3)]
576		#[pezpallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
577		pub fn force_clean_hrmp(
578			origin: OriginFor<T>,
579			para: ParaId,
580			num_inbound: u32,
581			num_outbound: u32,
582		) -> DispatchResult {
583			T::ChannelManager::ensure_origin(origin)?;
584
585			ensure!(
586				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default()
587					<= num_inbound as usize,
588				Error::<T>::WrongWitness
589			);
590			ensure!(
591				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default()
592					<= num_outbound as usize,
593				Error::<T>::WrongWitness
594			);
595
596			Self::clean_hrmp_after_outgoing(&para);
597			Ok(())
598		}
599
600		/// Force process HRMP open channel requests.
601		///
602		/// If there are pending HRMP open channel requests, you can use this function to process
603		/// all of those requests immediately.
604		///
605		/// Total number of opening channels must be provided as witness data.
606		///
607		/// Origin must be the `ChannelManager`.
608		#[pezpallet::call_index(4)]
609		#[pezpallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
610		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
611			T::ChannelManager::ensure_origin(origin)?;
612
613			ensure!(
614				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32
615					<= channels,
616				Error::<T>::WrongWitness
617			);
618
619			let host_config = configuration::ActiveConfig::<T>::get();
620			Self::process_hrmp_open_channel_requests(&host_config);
621			Ok(())
622		}
623
624		/// Force process HRMP close channel requests.
625		///
626		/// If there are pending HRMP close channel requests, you can use this function to process
627		/// all of those requests immediately.
628		///
629		/// Total number of closing channels must be provided as witness data.
630		///
631		/// Origin must be the `ChannelManager`.
632		#[pezpallet::call_index(5)]
633		#[pezpallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
634		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
635			T::ChannelManager::ensure_origin(origin)?;
636
637			ensure!(
638				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32
639					<= channels,
640				Error::<T>::WrongWitness
641			);
642
643			Self::process_hrmp_close_channel_requests();
644			Ok(())
645		}
646
647		/// This cancels a pending open channel request. It can be canceled by either of the sender
648		/// or the recipient for that request. The origin must be either of those.
649		///
650		/// The cancellation happens immediately. It is not possible to cancel the request if it is
651		/// already accepted.
652		///
653		/// Total number of open requests (i.e. `HrmpOpenChannelRequestsList`) must be provided as
654		/// witness data.
655		#[pezpallet::call_index(6)]
656		#[pezpallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
657		pub fn hrmp_cancel_open_request(
658			origin: OriginFor<T>,
659			channel_id: HrmpChannelId,
660			open_requests: u32,
661		) -> DispatchResult {
662			let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
663			ensure!(
664				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32
665					<= open_requests,
666				Error::<T>::WrongWitness
667			);
668			Self::cancel_open_request(origin, channel_id.clone())?;
669			Self::deposit_event(Event::OpenChannelCanceled { by_teyrchain: origin, channel_id });
670			Ok(())
671		}
672
673		/// Open a channel from a `sender` to a `recipient` `ParaId`. Although opened by governance,
674		/// the `max_capacity` and `max_message_size` are still subject to the Relay Chain's
675		/// configured limits.
676		///
677		/// Expected use is when one (and only one) of the `ParaId`s involved in the channel is
678		/// governed by the system, e.g. a system teyrchain.
679		///
680		/// Origin must be the `ChannelManager`.
681		#[pezpallet::call_index(7)]
682		#[pezpallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
683		pub fn force_open_hrmp_channel(
684			origin: OriginFor<T>,
685			sender: ParaId,
686			recipient: ParaId,
687			max_capacity: u32,
688			max_message_size: u32,
689		) -> DispatchResultWithPostInfo {
690			T::ChannelManager::ensure_origin(origin)?;
691
692			// Guard against a common footgun where someone makes a channel request to a system
693			// teyrchain and then makes a proposal to open the channel via governance, which fails
694			// because `init_open_channel` fails if there is an existing request. This check will
695			// clear an existing request such that `init_open_channel` should otherwise succeed.
696			let channel_id = HrmpChannelId { sender, recipient };
697			let cancel_request: u32 =
698				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
699					Self::cancel_open_request(sender, channel_id)?;
700					1
701				} else {
702					0
703				};
704
705			// Now we proceed with normal init/accept, except that we set `no_deposit` to true such
706			// that it will not require deposits from either member.
707			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
708			Self::accept_open_channel(recipient, sender)?;
709			Self::deposit_event(Event::HrmpChannelForceOpened {
710				sender,
711				recipient,
712				proposed_max_capacity: max_capacity,
713				proposed_max_message_size: max_message_size,
714			});
715
716			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
717		}
718
719		/// Establish an HRMP channel between two system chains. If the channel does not already
720		/// exist, the transaction fees will be refunded to the caller. The system does not take
721		/// deposits for channels between system chains, and automatically sets the message number
722		/// and size limits to the maximum allowed by the network's configuration.
723		///
724		/// Arguments:
725		///
726		/// - `sender`: A system chain, `ParaId`.
727		/// - `recipient`: A system chain, `ParaId`.
728		///
729		/// Any signed origin can call this function, but _both_ inputs MUST be system chains. If
730		/// the channel does not exist yet, there is no fee.
731		#[pezpallet::call_index(8)]
732		#[pezpallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
733		pub fn establish_system_channel(
734			origin: OriginFor<T>,
735			sender: ParaId,
736			recipient: ParaId,
737		) -> DispatchResultWithPostInfo {
738			let _caller = ensure_signed(origin)?;
739
740			// both chains must be system
741			ensure!(
742				sender.is_system() && recipient.is_system(),
743				Error::<T>::ChannelCreationNotAuthorized
744			);
745
746			let config = configuration::ActiveConfig::<T>::get();
747			let max_message_size = config.hrmp_channel_max_message_size;
748			let max_capacity = config.hrmp_channel_max_capacity;
749
750			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
751			Self::accept_open_channel(recipient, sender)?;
752
753			Self::deposit_event(Event::HrmpSystemChannelOpened {
754				sender,
755				recipient,
756				proposed_max_capacity: max_capacity,
757				proposed_max_message_size: max_message_size,
758			});
759
760			Ok(Pays::No.into())
761		}
762
763		/// Update the deposits held for an HRMP channel to the latest `Configuration`. Channels
764		/// with system chains do not require a deposit.
765		///
766		/// Arguments:
767		///
768		/// - `sender`: A chain, `ParaId`.
769		/// - `recipient`: A chain, `ParaId`.
770		///
771		/// Any signed origin can call this function.
772		#[pezpallet::call_index(9)]
773		#[pezpallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
774		pub fn poke_channel_deposits(
775			origin: OriginFor<T>,
776			sender: ParaId,
777			recipient: ParaId,
778		) -> DispatchResult {
779			let _caller = ensure_signed(origin)?;
780			let channel_id = HrmpChannelId { sender, recipient };
781			let is_system = sender.is_system() || recipient.is_system();
782
783			let config = configuration::ActiveConfig::<T>::get();
784
785			// Channels with and amongst the system do not require a deposit.
786			let (new_sender_deposit, new_recipient_deposit) = if is_system {
787				(0, 0)
788			} else {
789				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
790			};
791
792			HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
793				if let Some(ref mut channel) = channel {
794					let current_sender_deposit = channel.sender_deposit;
795					let current_recipient_deposit = channel.recipient_deposit;
796
797					// nothing to update
798					if current_sender_deposit == new_sender_deposit
799						&& current_recipient_deposit == new_recipient_deposit
800					{
801						return Ok(());
802					}
803
804					// sender
805					if current_sender_deposit > new_sender_deposit {
806						// Can never underflow, but be paranoid.
807						let amount = current_sender_deposit
808							.checked_sub(new_sender_deposit)
809							.ok_or(ArithmeticError::Underflow)?;
810						T::Currency::unreserve(
811							&channel_id.sender.into_account_truncating(),
812							// The difference should always be convertible into `Balance`, but be
813							// paranoid and do nothing in case.
814							amount.try_into().unwrap_or(Zero::zero()),
815						);
816					} else if current_sender_deposit < new_sender_deposit {
817						let amount = new_sender_deposit
818							.checked_sub(current_sender_deposit)
819							.ok_or(ArithmeticError::Underflow)?;
820						T::Currency::reserve(
821							&channel_id.sender.into_account_truncating(),
822							amount.try_into().unwrap_or(Zero::zero()),
823						)?;
824					}
825
826					// recipient
827					if current_recipient_deposit > new_recipient_deposit {
828						let amount = current_recipient_deposit
829							.checked_sub(new_recipient_deposit)
830							.ok_or(ArithmeticError::Underflow)?;
831						T::Currency::unreserve(
832							&channel_id.recipient.into_account_truncating(),
833							amount.try_into().unwrap_or(Zero::zero()),
834						);
835					} else if current_recipient_deposit < new_recipient_deposit {
836						let amount = new_recipient_deposit
837							.checked_sub(current_recipient_deposit)
838							.ok_or(ArithmeticError::Underflow)?;
839						T::Currency::reserve(
840							&channel_id.recipient.into_account_truncating(),
841							amount.try_into().unwrap_or(Zero::zero()),
842						)?;
843					}
844
845					// update storage
846					channel.sender_deposit = new_sender_deposit;
847					channel.recipient_deposit = new_recipient_deposit;
848				} else {
849					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into());
850				}
851				Ok(())
852			})?;
853
854			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
855
856			Ok(())
857		}
858
859		/// Establish a bidirectional HRMP channel between a teyrchain and a system chain.
860		///
861		/// Arguments:
862		///
863		/// - `target_system_chain`: A system chain, `ParaId`.
864		///
865		/// The origin needs to be the teyrchain origin.
866		#[pezpallet::call_index(10)]
867		#[pezpallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
868		pub fn establish_channel_with_system(
869			origin: OriginFor<T>,
870			target_system_chain: ParaId,
871		) -> DispatchResultWithPostInfo {
872			let sender = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
873
874			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
875
876			let (max_message_size, max_capacity) =
877				T::DefaultChannelSizeAndCapacityWithSystem::get();
878
879			// create bidirectional channel
880			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
881			Self::accept_open_channel(target_system_chain, sender)?;
882
883			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
884			Self::accept_open_channel(sender, target_system_chain)?;
885
886			Self::deposit_event(Event::HrmpSystemChannelOpened {
887				sender,
888				recipient: target_system_chain,
889				proposed_max_capacity: max_capacity,
890				proposed_max_message_size: max_message_size,
891			});
892
893			Self::deposit_event(Event::HrmpSystemChannelOpened {
894				sender: target_system_chain,
895				recipient: sender,
896				proposed_max_capacity: max_capacity,
897				proposed_max_message_size: max_message_size,
898			});
899
900			Ok(Pays::No.into())
901		}
902	}
903}
904
905fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
906	let host_config = configuration::ActiveConfig::<T>::get();
907	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
908		if let Err(err) =
909			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
910		{
911			panic!("failed to initialize the genesis storage: {:?}", err);
912		}
913	}
914	Pezpallet::<T>::process_hrmp_open_channel_requests(&host_config);
915}
916
917fn preopen_hrmp_channel<T: Config>(
918	sender: ParaId,
919	recipient: ParaId,
920	max_capacity: u32,
921	max_message_size: u32,
922) -> DispatchResult {
923	Pezpallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
924	Pezpallet::<T>::accept_open_channel(recipient, sender)?;
925	Ok(())
926}
927
928/// Routines and getters related to HRMP.
929impl<T: Config> Pezpallet<T> {
930	/// Block initialization logic, called by initializer.
931	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
932		Weight::zero()
933	}
934
935	/// Block finalization logic, called by initializer.
936	pub(crate) fn initializer_finalize() {}
937
938	/// Called by the initializer to note that a new session has started.
939	pub(crate) fn initializer_on_new_session(
940		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
941		outgoing_paras: &[ParaId],
942	) -> Weight {
943		let w1 = Self::perform_outgoing_para_cleanup(&notification.prev_config, outgoing_paras);
944		Self::process_hrmp_open_channel_requests(&notification.prev_config);
945		Self::process_hrmp_close_channel_requests();
946		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
947			outgoing_paras.len() as u32
948		))
949		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
950			outgoing_paras.len() as u32,
951		))
952	}
953
954	/// Iterate over all paras that were noted for offboarding and remove all the data
955	/// associated with them.
956	fn perform_outgoing_para_cleanup(
957		config: &HostConfiguration<BlockNumberFor<T>>,
958		outgoing: &[ParaId],
959	) -> Weight {
960		let mut w = Self::clean_open_channel_requests(config, outgoing);
961		for outgoing_para in outgoing {
962			Self::clean_hrmp_after_outgoing(outgoing_para);
963
964			// we need a few extra bits of data to weigh this -- all of this is read internally
965			// anyways, so no overhead.
966			let ingress_count =
967				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
968			let egress_count =
969				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
970			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
971				ingress_count,
972				egress_count,
973			));
974		}
975		w
976	}
977
978	// Go over the HRMP open channel requests and remove all in which offboarding paras participate.
979	//
980	// This will also perform the refunds for the counterparty if it doesn't offboard.
981	pub(crate) fn clean_open_channel_requests(
982		config: &HostConfiguration<BlockNumberFor<T>>,
983		outgoing: &[ParaId],
984	) -> Weight {
985		// First collect all the channel ids of the open requests in which there is at least one
986		// party presents in the outgoing list.
987		//
988		// Both the open channel request list and outgoing list are expected to be small enough.
989		// In the most common case there will be only single outgoing para.
990		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
991		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
992			.into_iter()
993			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
994		HrmpOpenChannelRequestsList::<T>::put(stay);
995
996		// Then iterate over all open requests to be removed, pull them out of the set and perform
997		// the refunds if applicable.
998		for req_id in go {
999			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
1000				Some(req_data) => req_data,
1001				None => {
1002					// Can't normally happen but no need to panic.
1003					continue;
1004				},
1005			};
1006
1007			// Return the deposit of the sender, but only if it is not the para being offboarded.
1008			if !outgoing.contains(&req_id.sender) {
1009				T::Currency::unreserve(
1010					&req_id.sender.into_account_truncating(),
1011					req_data.sender_deposit.unique_saturated_into(),
1012				);
1013			}
1014
1015			// If the request was confirmed, then it means it was confirmed in the finished session.
1016			// Therefore, the config's hrmp_recipient_deposit represents the actual value of the
1017			// deposit.
1018			//
1019			// We still want to refund the deposit only if the para is not being offboarded.
1020			if req_data.confirmed {
1021				if !outgoing.contains(&req_id.recipient) {
1022					T::Currency::unreserve(
1023						&req_id.recipient.into_account_truncating(),
1024						config.hrmp_recipient_deposit.unique_saturated_into(),
1025					);
1026				}
1027				Self::decrease_accepted_channel_request_count(req_id.recipient);
1028			}
1029		}
1030
1031		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1032	}
1033
1034	/// Remove all storage entries associated with the given para.
1035	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1036		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1037		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1038
1039		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1040			.into_iter()
1041			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1042		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1043			.into_iter()
1044			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1045		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1046		to_close.sort();
1047		to_close.dedup();
1048
1049		for channel in to_close {
1050			Self::close_hrmp_channel(&channel);
1051		}
1052	}
1053
1054	/// Iterate over all open channel requests and:
1055	///
1056	/// - prune the stale requests
1057	/// - enact the confirmed requests
1058	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1059		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1060		if open_req_channels.is_empty() {
1061			return;
1062		}
1063
1064		// iterate the vector starting from the end making our way to the beginning. This way we
1065		// can leverage `swap_remove` to efficiently remove an item during iteration.
1066		let mut idx = open_req_channels.len();
1067		loop {
1068			// bail if we've iterated over all items.
1069			if idx == 0 {
1070				break;
1071			}
1072
1073			idx -= 1;
1074			let channel_id = open_req_channels[idx].clone();
1075			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1076				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
1077			);
1078
1079			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1080			let sender_deposit = request.sender_deposit;
1081			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1082
1083			if request.confirmed {
1084				if paras::Pezpallet::<T>::is_valid_para(channel_id.sender)
1085					&& paras::Pezpallet::<T>::is_valid_para(channel_id.recipient)
1086				{
1087					HrmpChannels::<T>::insert(
1088						&channel_id,
1089						HrmpChannel {
1090							sender_deposit,
1091							recipient_deposit,
1092							max_capacity: request.max_capacity,
1093							max_total_size: request.max_total_size,
1094							max_message_size: request.max_message_size,
1095							msg_count: 0,
1096							total_size: 0,
1097							mqc_head: None,
1098						},
1099					);
1100
1101					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1102						if let Err(i) = v.binary_search(&channel_id.sender) {
1103							v.insert(i, channel_id.sender);
1104						}
1105					});
1106					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1107						if let Err(i) = v.binary_search(&channel_id.recipient) {
1108							v.insert(i, channel_id.recipient);
1109						}
1110					});
1111				}
1112
1113				Self::decrease_open_channel_request_count(channel_id.sender);
1114				Self::decrease_accepted_channel_request_count(channel_id.recipient);
1115
1116				let _ = open_req_channels.swap_remove(idx);
1117				HrmpOpenChannelRequests::<T>::remove(&channel_id);
1118			}
1119		}
1120
1121		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1122	}
1123
1124	/// Iterate over all close channel requests unconditionally closing the channels.
1125	fn process_hrmp_close_channel_requests() {
1126		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1127		for condemned_ch_id in close_reqs {
1128			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1129			Self::close_hrmp_channel(&condemned_ch_id);
1130		}
1131	}
1132
1133	/// Close and remove the designated HRMP channel.
1134	///
1135	/// This includes returning the deposits.
1136	///
1137	/// This function is idempotent, meaning that after the first application it should have no
1138	/// effect (i.e. it won't return the deposits twice).
1139	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1140		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1141			HrmpChannels::<T>::take(channel_id)
1142		{
1143			T::Currency::unreserve(
1144				&channel_id.sender.into_account_truncating(),
1145				sender_deposit.unique_saturated_into(),
1146			);
1147			T::Currency::unreserve(
1148				&channel_id.recipient.into_account_truncating(),
1149				recipient_deposit.unique_saturated_into(),
1150			);
1151		}
1152
1153		HrmpChannelContents::<T>::remove(channel_id);
1154
1155		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1156			if let Ok(i) = v.binary_search(&channel_id.recipient) {
1157				v.remove(i);
1158			}
1159		});
1160		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1161			if let Ok(i) = v.binary_search(&channel_id.sender) {
1162				v.remove(i);
1163			}
1164		});
1165	}
1166
1167	/// Check that the candidate of the given recipient controls the HRMP watermark properly.
1168	pub(crate) fn check_hrmp_watermark(
1169		recipient: ParaId,
1170		relay_chain_parent_number: BlockNumberFor<T>,
1171		new_hrmp_watermark: BlockNumberFor<T>,
1172	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1173		// First, check where the watermark CANNOT legally land.
1174		//
1175		// (a) For ensuring that messages are eventually processed, we require each parablock's
1176		//     watermark to be greater than the last one. The exception to this is if the previous
1177		//     watermark was already equal to the current relay-parent number.
1178		//
1179		// (b) However, a teyrchain cannot read into "the future", therefore the watermark should
1180		//     not be greater than the relay-chain context block which the parablock refers to.
1181		if new_hrmp_watermark == relay_chain_parent_number {
1182			return Ok(());
1183		}
1184
1185		if new_hrmp_watermark > relay_chain_parent_number {
1186			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1187				new_watermark: new_hrmp_watermark,
1188				relay_chain_parent_number,
1189			});
1190		}
1191
1192		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1193			if new_hrmp_watermark < last_watermark {
1194				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1195					new_watermark: new_hrmp_watermark,
1196					last_watermark,
1197				});
1198			}
1199
1200			if new_hrmp_watermark == last_watermark {
1201				return Ok(());
1202			}
1203		}
1204
1205		// Second, check where the watermark CAN land. It's one of the following:
1206		//
1207		// (a) The relay parent block number (checked above).
1208		// (b) A relay-chain block in which this para received at least one message (checked here)
1209		let digest = HrmpChannelDigests::<T>::get(&recipient);
1210		if !digest
1211			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1212			.is_ok()
1213		{
1214			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1215				new_watermark: new_hrmp_watermark,
1216			});
1217		}
1218		Ok(())
1219	}
1220
1221	/// Returns HRMP watermarks of previously sent messages to a given para.
1222	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1223		let mut valid_watermarks: Vec<_> = HrmpChannelDigests::<T>::get(&recipient)
1224			.into_iter()
1225			.map(|(block_no, _)| block_no)
1226			.collect();
1227
1228		// The current watermark will remain valid until updated.
1229		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1230			if valid_watermarks.first().map_or(false, |w| w > &last_watermark) {
1231				valid_watermarks.insert(0, last_watermark);
1232			}
1233		}
1234
1235		valid_watermarks
1236	}
1237
1238	pub(crate) fn check_outbound_hrmp(
1239		config: &HostConfiguration<BlockNumberFor<T>>,
1240		sender: ParaId,
1241		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1242	) -> Result<(), OutboundHrmpAcceptanceErr> {
1243		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1244			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1245				sent: out_hrmp_msgs.len() as u32,
1246				permitted: config.hrmp_max_message_num_per_candidate,
1247			});
1248		}
1249
1250		let mut last_recipient = None::<ParaId>;
1251
1252		for (idx, out_msg) in
1253			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1254		{
1255			match last_recipient {
1256				// the messages must be sorted in ascending order and there must be no two messages
1257				// sent to the same recipient. Thus we can check that every recipient is strictly
1258				// greater than the previous one.
1259				Some(last_recipient) if out_msg.recipient <= last_recipient => {
1260					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx })
1261				},
1262				_ => last_recipient = Some(out_msg.recipient),
1263			}
1264
1265			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1266
1267			let channel = match HrmpChannels::<T>::get(&channel_id) {
1268				Some(channel) => channel,
1269				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1270			};
1271
1272			let msg_size = out_msg.data.len() as u32;
1273			if msg_size > channel.max_message_size {
1274				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1275					idx,
1276					msg_size,
1277					max_size: channel.max_message_size,
1278				});
1279			}
1280
1281			let new_total_size = channel.total_size + out_msg.data.len() as u32;
1282			if new_total_size > channel.max_total_size {
1283				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1284					idx,
1285					total_size: new_total_size,
1286					limit: channel.max_total_size,
1287				});
1288			}
1289
1290			let new_msg_count = channel.msg_count + 1;
1291			if new_msg_count > channel.max_capacity {
1292				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1293					idx,
1294					count: new_msg_count,
1295					limit: channel.max_capacity,
1296				});
1297			}
1298		}
1299
1300		Ok(())
1301	}
1302
1303	/// Returns remaining outbound channels capacity in messages and in bytes per recipient para.
1304	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1305		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1306		let mut remaining = Vec::with_capacity(recipients.len());
1307
1308		for recipient in recipients {
1309			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1310				continue;
1311			};
1312			remaining.push((
1313				recipient,
1314				(
1315					channel.max_capacity - channel.msg_count,
1316					channel.max_total_size - channel.total_size,
1317				),
1318			));
1319		}
1320
1321		remaining
1322	}
1323
1324	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1325		// sift through the incoming messages digest to collect the paras that sent at least one
1326		// message to this teyrchain between the old and new watermarks.
1327		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1328			let mut senders = BTreeSet::new();
1329			let mut leftover = Vec::with_capacity(digest.len());
1330			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1331				if block_no <= new_hrmp_watermark {
1332					senders.extend(paras_sent_msg);
1333				} else {
1334					leftover.push((block_no, paras_sent_msg));
1335				}
1336			}
1337			*digest = leftover;
1338			senders
1339		});
1340
1341		// having all senders we can trivially find out the channels which we need to prune.
1342		let channels_to_prune =
1343			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1344		for channel_id in channels_to_prune {
1345			// prune each channel up to the new watermark keeping track how many messages we removed
1346			// and what is the total byte size of them.
1347			let (mut pruned_cnt, mut pruned_size) = (0, 0);
1348
1349			let contents = HrmpChannelContents::<T>::get(&channel_id);
1350			let mut leftover = Vec::with_capacity(contents.len());
1351			for msg in contents {
1352				if msg.sent_at <= new_hrmp_watermark {
1353					pruned_cnt += 1;
1354					pruned_size += msg.data.len();
1355				} else {
1356					leftover.push(msg);
1357				}
1358			}
1359			if !leftover.is_empty() {
1360				HrmpChannelContents::<T>::insert(&channel_id, leftover);
1361			} else {
1362				HrmpChannelContents::<T>::remove(&channel_id);
1363			}
1364
1365			// update the channel metadata.
1366			HrmpChannels::<T>::mutate(&channel_id, |channel| {
1367				if let Some(ref mut channel) = channel {
1368					channel.msg_count -= pruned_cnt as u32;
1369					channel.total_size -= pruned_size as u32;
1370				}
1371			});
1372		}
1373
1374		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1375	}
1376
1377	/// Process the outbound HRMP messages by putting them into the appropriate recipient queues.
1378	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1379		let now = pezframe_system::Pezpallet::<T>::block_number();
1380
1381		for out_msg in out_hrmp_msgs {
1382			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1383
1384			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1385				Some(channel) => channel,
1386				None => {
1387					// apparently, that since acceptance of this candidate the recipient was
1388					// offboarded and the channel no longer exists.
1389					continue;
1390				},
1391			};
1392
1393			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1394
1395			// book keeping
1396			channel.msg_count += 1;
1397			channel.total_size += inbound.data.len() as u32;
1398
1399			// compute the new MQC head of the channel
1400			let prev_head = channel.mqc_head.unwrap_or(Default::default());
1401			let new_head = BlakeTwo256::hash_of(&(
1402				prev_head,
1403				inbound.sent_at,
1404				T::Hashing::hash_of(&inbound.data),
1405			));
1406			channel.mqc_head = Some(new_head);
1407
1408			HrmpChannels::<T>::insert(&channel_id, channel);
1409			HrmpChannelContents::<T>::append(&channel_id, inbound);
1410
1411			// The digests are sorted in ascending by block number order. There are only two
1412			// possible scenarios here ("the current" is the block of candidate's inclusion):
1413			//
1414			// (a) It's the first time anybody sends a message to this recipient within this block.
1415			//     In this case, the digest vector would be empty or the block number of the latest
1416			//     entry is smaller than the current.
1417			//
1418			// (b) Somebody has already sent a message within the current block. That means that
1419			//     the block number of the latest entry is equal to the current.
1420			//
1421			// Note that having the latest entry greater than the current block number is a logical
1422			// error.
1423			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1424			if let Some(cur_block_digest) = recipient_digest
1425				.last_mut()
1426				.filter(|(block_no, _)| *block_no == now)
1427				.map(|(_, ref mut d)| d)
1428			{
1429				cur_block_digest.push(sender);
1430			} else {
1431				recipient_digest.push((now, vec![sender]));
1432			}
1433			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1434		}
1435	}
1436
1437	/// Initiate opening a channel from a teyrchain to a given recipient with given channel
1438	/// parameters. If neither chain is part of the system, then a deposit from the `Configuration`
1439	/// will be required for `origin` (the sender) upon opening the request and the `recipient` upon
1440	/// accepting it.
1441	///
1442	/// Basically the same as [`hrmp_init_open_channel`](Pezpallet::hrmp_init_open_channel) but
1443	/// intended for calling directly from other pallets rather than dispatched.
1444	pub fn init_open_channel(
1445		origin: ParaId,
1446		recipient: ParaId,
1447		proposed_max_capacity: u32,
1448		proposed_max_message_size: u32,
1449	) -> DispatchResult {
1450		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1451		ensure!(
1452			paras::Pezpallet::<T>::is_valid_para(recipient),
1453			Error::<T>::OpenHrmpChannelInvalidRecipient,
1454		);
1455
1456		let config = configuration::ActiveConfig::<T>::get();
1457		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1458		ensure!(
1459			proposed_max_capacity <= config.hrmp_channel_max_capacity,
1460			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1461		);
1462		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1463		ensure!(
1464			proposed_max_message_size <= config.hrmp_channel_max_message_size,
1465			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1466		);
1467
1468		let channel_id = HrmpChannelId { sender: origin, recipient };
1469		ensure!(
1470			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1471			Error::<T>::OpenHrmpChannelAlreadyRequested,
1472		);
1473		ensure!(
1474			HrmpChannels::<T>::get(&channel_id).is_none(),
1475			Error::<T>::OpenHrmpChannelAlreadyExists,
1476		);
1477
1478		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1479		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1480		let channel_num_limit = config.hrmp_max_teyrchain_outbound_channels;
1481		ensure!(
1482			egress_cnt + open_req_cnt < channel_num_limit,
1483			Error::<T>::OpenHrmpChannelLimitExceeded,
1484		);
1485
1486		// Do not require deposits for channels with or amongst the system.
1487		let is_system = origin.is_system() || recipient.is_system();
1488		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1489		if !deposit.is_zero() {
1490			T::Currency::reserve(
1491				&origin.into_account_truncating(),
1492				deposit.unique_saturated_into(),
1493			)?;
1494		}
1495
1496		// mutating storage directly now -- shall not bail henceforth.
1497
1498		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1499		HrmpOpenChannelRequests::<T>::insert(
1500			&channel_id,
1501			HrmpOpenChannelRequest {
1502				confirmed: false,
1503				_age: 0,
1504				sender_deposit: deposit,
1505				max_capacity: proposed_max_capacity,
1506				max_message_size: proposed_max_message_size,
1507				max_total_size: config.hrmp_channel_max_total_size,
1508			},
1509		);
1510		HrmpOpenChannelRequestsList::<T>::append(channel_id);
1511
1512		Self::send_to_para(
1513			"init_open_channel",
1514			&config,
1515			recipient,
1516			Self::wrap_notification(|| {
1517				use xcm::opaque::latest::{prelude::*, Xcm};
1518				Xcm(vec![HrmpNewChannelOpenRequest {
1519					sender: origin.into(),
1520					max_capacity: proposed_max_capacity,
1521					max_message_size: proposed_max_message_size,
1522				}])
1523			}),
1524		);
1525
1526		Ok(())
1527	}
1528
1529	/// Accept a pending open channel request from the given sender.
1530	///
1531	/// Basically the same as [`hrmp_accept_open_channel`](Pezpallet::hrmp_accept_open_channel) but
1532	/// intended for calling directly from other pallets rather than dispatched.
1533	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1534		let channel_id = HrmpChannelId { sender, recipient: origin };
1535		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1536			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1537		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1538
1539		// check if by accepting this open channel request, this teyrchain would exceed the
1540		// number of inbound channels.
1541		let config = configuration::ActiveConfig::<T>::get();
1542		let channel_num_limit = config.hrmp_max_teyrchain_inbound_channels;
1543		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1544		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1545		ensure!(
1546			ingress_cnt + accepted_cnt < channel_num_limit,
1547			Error::<T>::AcceptHrmpChannelLimitExceeded,
1548		);
1549
1550		// Do not require deposits for channels with or amongst the system.
1551		let is_system = origin.is_system() || sender.is_system();
1552		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1553		if !deposit.is_zero() {
1554			T::Currency::reserve(
1555				&origin.into_account_truncating(),
1556				deposit.unique_saturated_into(),
1557			)?;
1558		}
1559
1560		// persist the updated open channel request and then increment the number of accepted
1561		// channels.
1562		channel_req.confirmed = true;
1563		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1564		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1565
1566		Self::send_to_para(
1567			"accept_open_channel",
1568			&config,
1569			sender,
1570			Self::wrap_notification(|| {
1571				use xcm::opaque::latest::{prelude::*, Xcm};
1572				Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1573			}),
1574		);
1575
1576		Ok(())
1577	}
1578
1579	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1580		// check if the origin is allowed to close the channel.
1581		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1582
1583		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1584			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1585		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1586
1587		// Remove the request by the channel id and sync the accompanying list with the set.
1588		HrmpOpenChannelRequests::<T>::remove(&channel_id);
1589		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1590			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1591				open_req_channels.swap_remove(pos);
1592			}
1593		});
1594
1595		Self::decrease_open_channel_request_count(channel_id.sender);
1596		// Don't decrease `HrmpAcceptedChannelRequestCount` because we don't consider confirmed
1597		// requests here.
1598
1599		// Unreserve the sender's deposit. The recipient could not have left their deposit because
1600		// we ensured that the request is not confirmed.
1601		T::Currency::unreserve(
1602			&channel_id.sender.into_account_truncating(),
1603			open_channel_req.sender_deposit.unique_saturated_into(),
1604		);
1605
1606		Ok(())
1607	}
1608
1609	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1610		// check if the origin is allowed to close the channel.
1611		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1612
1613		// check if the channel requested to close does exist.
1614		ensure!(
1615			HrmpChannels::<T>::get(&channel_id).is_some(),
1616			Error::<T>::CloseHrmpChannelDoesntExist,
1617		);
1618
1619		// check that there is no outstanding close request for this channel
1620		ensure!(
1621			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1622			Error::<T>::CloseHrmpChannelAlreadyUnderway,
1623		);
1624
1625		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1626		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1627
1628		let config = configuration::ActiveConfig::<T>::get();
1629		let opposite_party =
1630			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1631
1632		Self::send_to_para(
1633			"close_channel",
1634			&config,
1635			opposite_party,
1636			Self::wrap_notification(|| {
1637				use xcm::opaque::latest::{prelude::*, Xcm};
1638				Xcm(vec![HrmpChannelClosing {
1639					initiator: origin.into(),
1640					sender: channel_id.sender.into(),
1641					recipient: channel_id.recipient.into(),
1642				}])
1643			}),
1644		);
1645
1646		Ok(())
1647	}
1648
1649	/// Returns the list of MQC heads for the inbound channels of the given recipient para paired
1650	/// with the sender para ids. This vector is sorted ascending by the para id and doesn't contain
1651	/// multiple entries with the same sender.
1652	#[cfg(test)]
1653	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1654		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1655
1656		// The ingress channels vector is sorted, thus `mqc_heads` is sorted as well.
1657		let mut mqc_heads = Vec::with_capacity(sender_set.len());
1658		for sender in sender_set {
1659			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1660			let mqc_head = channel_metadata
1661				.and_then(|metadata| metadata.mqc_head)
1662				.unwrap_or(Hash::default());
1663			mqc_heads.push((sender, mqc_head));
1664		}
1665
1666		mqc_heads
1667	}
1668
1669	/// Returns contents of all channels addressed to the given recipient. Channels that have no
1670	/// messages in them are also included.
1671	pub(crate) fn inbound_hrmp_channels_contents(
1672		recipient: ParaId,
1673	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1674		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1675
1676		let mut inbound_hrmp_channels_contents = BTreeMap::new();
1677		for sender in sender_set {
1678			let channel_contents =
1679				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1680			inbound_hrmp_channels_contents.insert(sender, channel_contents);
1681		}
1682
1683		inbound_hrmp_channels_contents
1684	}
1685}
1686
1687impl<T: Config> Pezpallet<T> {
1688	/// Decreases the open channel request count for the given sender. If the value reaches zero
1689	/// it is removed completely.
1690	fn decrease_open_channel_request_count(sender: ParaId) {
1691		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1692			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1693				0 => None,
1694				n => Some(n),
1695			});
1696		});
1697	}
1698
1699	/// Decreases the accepted channel request count for the given sender. If the value reaches
1700	/// zero it is removed completely.
1701	fn decrease_accepted_channel_request_count(recipient: ParaId) {
1702		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1703			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1704				0 => None,
1705				n => Some(n),
1706			});
1707		});
1708	}
1709
1710	#[cfg(any(feature = "runtime-benchmarks", test))]
1711	fn assert_storage_consistency_exhaustive() {
1712		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1713			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1714		}
1715
1716		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1717			for para in paras {
1718				assert!(
1719					crate::paras::Pezpallet::<T>::is_valid_para(para),
1720					"{}: {:?} para is offboarded",
1721					cause,
1722					para
1723				);
1724			}
1725		};
1726
1727		assert_eq!(
1728			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1729			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1730		);
1731
1732		// verify that the set of keys in `HrmpOpenChannelRequestCount` corresponds to the set
1733		// of _senders_ in `HrmpOpenChannelRequests`.
1734		//
1735		// having ensured that, we can go ahead and go over all counts and verify that they match.
1736		assert_eq!(
1737			HrmpOpenChannelRequestCount::<T>::iter()
1738				.map(|(k, _)| k)
1739				.collect::<BTreeSet<_>>(),
1740			HrmpOpenChannelRequests::<T>::iter()
1741				.map(|(k, _)| k.sender)
1742				.collect::<BTreeSet<_>>(),
1743		);
1744		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1745			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1746				.filter(|(ch, _)| ch.sender == open_channel_initiator)
1747				.count() as u32;
1748			assert_eq!(expected_num, actual_num);
1749		}
1750
1751		// The same as above, but for accepted channel request count. Note that we are interested
1752		// only in confirmed open requests.
1753		assert_eq!(
1754			HrmpAcceptedChannelRequestCount::<T>::iter()
1755				.map(|(k, _)| k)
1756				.collect::<BTreeSet<_>>(),
1757			HrmpOpenChannelRequests::<T>::iter()
1758				.filter(|(_, v)| v.confirmed)
1759				.map(|(k, _)| k.recipient)
1760				.collect::<BTreeSet<_>>(),
1761		);
1762		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1763			let actual_num = HrmpOpenChannelRequests::<T>::iter()
1764				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1765				.count() as u32;
1766			assert_eq!(expected_num, actual_num);
1767		}
1768
1769		assert_eq!(
1770			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1771			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1772		);
1773
1774		// A HRMP watermark can be None for an onboarded teyrchain. However, an offboarded teyrchain
1775		// cannot have an HRMP watermark: it should've been cleanup.
1776		assert_contains_only_onboarded(
1777			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1778			"HRMP watermarks should contain only onboarded paras",
1779		);
1780
1781		// An entry in `HrmpChannels` indicates that the channel is open. Only open channels can
1782		// have contents.
1783		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1784			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1785
1786			// pedantic check: there should be no empty vectors in storage, those should be modeled
1787			// by a removed kv pair.
1788			assert!(!contents.is_empty());
1789		}
1790
1791		// Senders and recipients must be onboarded. Otherwise, all channels associated with them
1792		// are removed.
1793		assert_contains_only_onboarded(
1794			HrmpChannels::<T>::iter()
1795				.flat_map(|(k, _)| vec![k.sender, k.recipient])
1796				.collect::<Vec<_>>(),
1797			"senders and recipients in all channels should be onboarded",
1798		);
1799
1800		// Check the docs for `HrmpIngressChannelsIndex` and `HrmpEgressChannelsIndex` in decl
1801		// storage to get an index what are the channel mappings indexes.
1802		//
1803		// Here, from indexes.
1804		//
1805		// ingress         egress
1806		//
1807		// a -> [x, y]     x -> [a, b]
1808		// b -> [x, z]     y -> [a]
1809		//                 z -> [b]
1810		//
1811		// we derive a list of channels they represent.
1812		//
1813		//   (a, x)         (a, x)
1814		//   (a, y)         (a, y)
1815		//   (b, x)         (b, x)
1816		//   (b, z)         (b, z)
1817		//
1818		// and then that we compare that to the channel list in the `HrmpChannels`.
1819		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1820			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1821			.collect::<BTreeSet<_>>();
1822		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1823			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1824			.collect::<BTreeSet<_>>();
1825		let channel_set_ground_truth = HrmpChannels::<T>::iter()
1826			.map(|(k, _)| (k.sender, k.recipient))
1827			.collect::<BTreeSet<_>>();
1828		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1829		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1830
1831		HrmpIngressChannelsIndex::<T>::iter()
1832			.map(|(_, v)| v)
1833			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1834		HrmpEgressChannelsIndex::<T>::iter()
1835			.map(|(_, v)| v)
1836			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1837
1838		assert_contains_only_onboarded(
1839			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1840			"HRMP channel digests should contain only onboarded paras",
1841		);
1842		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1843			// Assert that items are in **strictly** ascending order. The strictness also implies
1844			// there are no duplicates.
1845			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1846
1847			for (_, mut senders) in digest {
1848				assert!(!senders.is_empty());
1849
1850				// check for duplicates. For that we sort the vector, then perform deduplication.
1851				// if the vector stayed the same, there are no duplicates.
1852				senders.sort();
1853				let orig_senders = senders.clone();
1854				senders.dedup();
1855				assert_eq!(
1856					orig_senders, senders,
1857					"duplicates removed implies existence of duplicates"
1858				);
1859			}
1860		}
1861	}
1862}
1863
1864impl<T: Config> Pezpallet<T> {
1865	/// Wraps HRMP XCM notifications to the most suitable XCM version for the destination para.
1866	/// If the XCM version is unknown, the latest XCM version is used as a best effort.
1867	fn wrap_notification(
1868		mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1869	) -> impl FnOnce(ParaId) -> pezkuwi_primitives::DownwardMessage {
1870		use xcm::{
1871			opaque::VersionedXcm,
1872			prelude::{Junction, Location},
1873			WrapVersion,
1874		};
1875
1876		// Return a closure that can prepare notifications.
1877		move |dest| {
1878			// Attempt to wrap the notification for the destination teyrchain.
1879			T::VersionWrapper::wrap_version(
1880				&Location::new(0, [Junction::Teyrchain(dest.into())]),
1881				notification(),
1882			)
1883			.unwrap_or_else(|_| {
1884				// As a best effort, if we cannot resolve the version, fallback to using the latest
1885				// version.
1886				VersionedXcm::from(notification())
1887			})
1888			.encode()
1889		}
1890	}
1891
1892	/// Sends/enqueues notification to the destination teyrchain.
1893	fn send_to_para(
1894		log_label: &str,
1895		config: &HostConfiguration<BlockNumberFor<T>>,
1896		dest: ParaId,
1897		notification_bytes_for: impl FnOnce(ParaId) -> pezkuwi_primitives::DownwardMessage,
1898	) {
1899		// prepare notification
1900		let notification_bytes = notification_bytes_for(dest);
1901
1902		// try to enqueue
1903		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1904			dmp::Pezpallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1905		{
1906			// this should never happen unless the max downward message size is configured to a
1907			// jokingly small number.
1908			log::error!(
1909				target: "runtime::hrmp",
1910				"sending '{log_label}::notification_bytes' failed."
1911			);
1912			debug_assert!(false);
1913		}
1914	}
1915}