use crate::{
	configuration::{self, HostConfiguration},
	dmp, ensure_parachain, initializer, paras,
};
use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
use frame_system::pallet_prelude::*;
use parity_scale_codec::{Decode, Encode};
use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
use primitives::{
	Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
	SessionIndex,
};
use scale_info::TypeInfo;
use sp_runtime::{
	traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
	ArithmeticError,
};
use sp_std::{
	collections::{btree_map::BTreeMap, btree_set::BTreeSet},
	fmt, mem,
	prelude::*,
};
pub use pallet::*;
pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
#[cfg(test)]
pub(crate) mod tests;
#[cfg(feature = "runtime-benchmarks")]
mod benchmarking;
pub trait WeightInfo {
	fn hrmp_init_open_channel() -> Weight;
	fn hrmp_accept_open_channel() -> Weight;
	fn hrmp_close_channel() -> Weight;
	fn force_clean_hrmp(i: u32, e: u32) -> Weight;
	fn force_process_hrmp_open(c: u32) -> Weight;
	fn force_process_hrmp_close(c: u32) -> Weight;
	fn hrmp_cancel_open_request(c: u32) -> Weight;
	fn clean_open_channel_requests(c: u32) -> Weight;
	fn force_open_hrmp_channel(c: u32) -> Weight;
	fn establish_system_channel() -> Weight;
	fn poke_channel_deposits() -> Weight;
	fn establish_channel_with_system() -> Weight;
}
pub struct TestWeightInfo;
impl WeightInfo for TestWeightInfo {
	fn hrmp_accept_open_channel() -> Weight {
		Weight::MAX
	}
	fn force_clean_hrmp(_: u32, _: u32) -> Weight {
		Weight::MAX
	}
	fn force_process_hrmp_close(_: u32) -> Weight {
		Weight::MAX
	}
	fn force_process_hrmp_open(_: u32) -> Weight {
		Weight::MAX
	}
	fn hrmp_cancel_open_request(_: u32) -> Weight {
		Weight::MAX
	}
	fn hrmp_close_channel() -> Weight {
		Weight::MAX
	}
	fn hrmp_init_open_channel() -> Weight {
		Weight::MAX
	}
	fn clean_open_channel_requests(_: u32) -> Weight {
		Weight::MAX
	}
	fn force_open_hrmp_channel(_: u32) -> Weight {
		Weight::MAX
	}
	fn establish_system_channel() -> Weight {
		Weight::MAX
	}
	fn poke_channel_deposits() -> Weight {
		Weight::MAX
	}
	fn establish_channel_with_system() -> Weight {
		Weight::MAX
	}
}
#[derive(Encode, Decode, TypeInfo)]
pub struct HrmpOpenChannelRequest {
	pub confirmed: bool,
	pub _age: SessionIndex,
	pub sender_deposit: Balance,
	pub max_message_size: u32,
	pub max_capacity: u32,
	pub max_total_size: u32,
}
#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(Debug))]
pub struct HrmpChannel {
	pub max_capacity: u32,
	pub max_total_size: u32,
	pub max_message_size: u32,
	pub msg_count: u32,
	pub total_size: u32,
	pub mqc_head: Option<Hash>,
	pub sender_deposit: Balance,
	pub recipient_deposit: Balance,
}
pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
	AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
	AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
	LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
}
pub(crate) enum OutboundHrmpAcceptanceErr {
	MoreMessagesThanPermitted { sent: u32, permitted: u32 },
	NotSorted { idx: u32 },
	NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
	MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
	TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
	CapacityExceeded { idx: u32, count: u32, limit: u32 },
}
impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
where
	BlockNumber: fmt::Debug,
{
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		use HrmpWatermarkAcceptanceErr::*;
		match self {
			AdvancementRule { new_watermark, last_watermark } => write!(
				fmt,
				"the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
				new_watermark, last_watermark,
			),
			AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
				fmt,
				"the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
				new_watermark, relay_chain_parent_number
			),
			LandsOnBlockWithNoMessages { new_watermark } => write!(
				fmt,
				"the HRMP watermark ({:?}) doesn't land on a block with messages received",
				new_watermark
			),
		}
	}
}
impl fmt::Debug for OutboundHrmpAcceptanceErr {
	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
		use OutboundHrmpAcceptanceErr::*;
		match self {
			MoreMessagesThanPermitted { sent, permitted } => write!(
				fmt,
				"more HRMP messages than permitted by config ({} > {})",
				sent, permitted,
			),
			NotSorted { idx } => {
				write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
			},
			NoSuchChannel { idx, channel_id } => write!(
				fmt,
				"the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
				idx, channel_id.sender, channel_id.recipient,
			),
			MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
				fmt,
				"the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
				idx, msg_size, max_size,
			),
			TotalSizeExceeded { idx, total_size, limit } => write!(
				fmt,
				"sending the HRMP message at index {} would exceed the negotiated channel total size  ({} > {})",
				idx, total_size, limit,
			),
			CapacityExceeded { idx, count, limit } => write!(
				fmt,
				"sending the HRMP message at index {} would exceed the negotiated channel capacity  ({} > {})",
				idx, count, limit,
			),
		}
	}
}
#[frame_support::pallet]
pub mod pallet {
	use super::*;
	#[pallet::pallet]
	#[pallet::without_storage_info]
	pub struct Pallet<T>(_);
	#[pallet::config]
	pub trait Config:
		frame_system::Config + configuration::Config + paras::Config + dmp::Config
	{
		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
		type RuntimeOrigin: From<crate::Origin>
			+ From<<Self as frame_system::Config>::RuntimeOrigin>
			+ Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
		type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
		type Currency: ReservableCurrency<Self::AccountId>;
		type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
		type WeightInfo: WeightInfo;
	}
	#[pallet::event]
	#[pallet::generate_deposit(pub(super) fn deposit_event)]
	pub enum Event<T: Config> {
		OpenChannelRequested {
			sender: ParaId,
			recipient: ParaId,
			proposed_max_capacity: u32,
			proposed_max_message_size: u32,
		},
		OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
		OpenChannelAccepted { sender: ParaId, recipient: ParaId },
		ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
		HrmpChannelForceOpened {
			sender: ParaId,
			recipient: ParaId,
			proposed_max_capacity: u32,
			proposed_max_message_size: u32,
		},
		HrmpSystemChannelOpened {
			sender: ParaId,
			recipient: ParaId,
			proposed_max_capacity: u32,
			proposed_max_message_size: u32,
		},
		OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
	}
	#[pallet::error]
	pub enum Error<T> {
		OpenHrmpChannelToSelf,
		OpenHrmpChannelInvalidRecipient,
		OpenHrmpChannelZeroCapacity,
		OpenHrmpChannelCapacityExceedsLimit,
		OpenHrmpChannelZeroMessageSize,
		OpenHrmpChannelMessageSizeExceedsLimit,
		OpenHrmpChannelAlreadyExists,
		OpenHrmpChannelAlreadyRequested,
		OpenHrmpChannelLimitExceeded,
		AcceptHrmpChannelDoesntExist,
		AcceptHrmpChannelAlreadyConfirmed,
		AcceptHrmpChannelLimitExceeded,
		CloseHrmpChannelUnauthorized,
		CloseHrmpChannelDoesntExist,
		CloseHrmpChannelAlreadyUnderway,
		CancelHrmpOpenChannelUnauthorized,
		OpenHrmpChannelDoesntExist,
		OpenHrmpChannelAlreadyConfirmed,
		WrongWitness,
		ChannelCreationNotAuthorized,
	}
	#[pallet::storage]
	pub type HrmpOpenChannelRequests<T: Config> =
		StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
	#[pallet::storage]
	pub type HrmpOpenChannelRequestsList<T: Config> =
		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
	#[pallet::storage]
	pub type HrmpOpenChannelRequestCount<T: Config> =
		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
	#[pallet::storage]
	pub type HrmpAcceptedChannelRequestCount<T: Config> =
		StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
	#[pallet::storage]
	pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
	#[pallet::storage]
	pub type HrmpCloseChannelRequestsList<T: Config> =
		StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
	#[pallet::storage]
	pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
	#[pallet::storage]
	pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
	#[pallet::storage]
	pub type HrmpIngressChannelsIndex<T: Config> =
		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
	#[pallet::storage]
	pub type HrmpEgressChannelsIndex<T: Config> =
		StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
	#[pallet::storage]
	pub type HrmpChannelContents<T: Config> = StorageMap<
		_,
		Twox64Concat,
		HrmpChannelId,
		Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
		ValueQuery,
	>;
	#[pallet::storage]
	pub type HrmpChannelDigests<T: Config> =
		StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
	#[pallet::genesis_config]
	#[derive(DefaultNoBound)]
	pub struct GenesisConfig<T: Config> {
		#[serde(skip)]
		_config: sp_std::marker::PhantomData<T>,
		preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
	}
	#[pallet::genesis_build]
	impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
		fn build(&self) {
			initialize_storage::<T>(&self.preopen_hrmp_channels);
		}
	}
	#[pallet::call]
	impl<T: Config> Pallet<T> {
		#[pallet::call_index(0)]
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
		pub fn hrmp_init_open_channel(
			origin: OriginFor<T>,
			recipient: ParaId,
			proposed_max_capacity: u32,
			proposed_max_message_size: u32,
		) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
			Self::init_open_channel(
				origin,
				recipient,
				proposed_max_capacity,
				proposed_max_message_size,
			)?;
			Self::deposit_event(Event::OpenChannelRequested {
				sender: origin,
				recipient,
				proposed_max_capacity,
				proposed_max_message_size,
			});
			Ok(())
		}
		#[pallet::call_index(1)]
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
		pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
			Self::accept_open_channel(origin, sender)?;
			Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
			Ok(())
		}
		#[pallet::call_index(2)]
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
		pub fn hrmp_close_channel(
			origin: OriginFor<T>,
			channel_id: HrmpChannelId,
		) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
			Self::close_channel(origin, channel_id.clone())?;
			Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
			Ok(())
		}
		#[pallet::call_index(3)]
		#[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
		pub fn force_clean_hrmp(
			origin: OriginFor<T>,
			para: ParaId,
			num_inbound: u32,
			num_outbound: u32,
		) -> DispatchResult {
			T::ChannelManager::ensure_origin(origin)?;
			ensure!(
				HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
					num_inbound as usize,
				Error::<T>::WrongWitness
			);
			ensure!(
				HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
					num_outbound as usize,
				Error::<T>::WrongWitness
			);
			Self::clean_hrmp_after_outgoing(¶);
			Ok(())
		}
		#[pallet::call_index(4)]
		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
		pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
			T::ChannelManager::ensure_origin(origin)?;
			ensure!(
				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
					channels,
				Error::<T>::WrongWitness
			);
			let host_config = configuration::ActiveConfig::<T>::get();
			Self::process_hrmp_open_channel_requests(&host_config);
			Ok(())
		}
		#[pallet::call_index(5)]
		#[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
		pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
			T::ChannelManager::ensure_origin(origin)?;
			ensure!(
				HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
					channels,
				Error::<T>::WrongWitness
			);
			Self::process_hrmp_close_channel_requests();
			Ok(())
		}
		#[pallet::call_index(6)]
		#[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
		pub fn hrmp_cancel_open_request(
			origin: OriginFor<T>,
			channel_id: HrmpChannelId,
			open_requests: u32,
		) -> DispatchResult {
			let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
			ensure!(
				HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
					open_requests,
				Error::<T>::WrongWitness
			);
			Self::cancel_open_request(origin, channel_id.clone())?;
			Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
			Ok(())
		}
		#[pallet::call_index(7)]
		#[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
		pub fn force_open_hrmp_channel(
			origin: OriginFor<T>,
			sender: ParaId,
			recipient: ParaId,
			max_capacity: u32,
			max_message_size: u32,
		) -> DispatchResultWithPostInfo {
			T::ChannelManager::ensure_origin(origin)?;
			let channel_id = HrmpChannelId { sender, recipient };
			let cancel_request: u32 =
				if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
					Self::cancel_open_request(sender, channel_id)?;
					1
				} else {
					0
				};
			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
			Self::accept_open_channel(recipient, sender)?;
			Self::deposit_event(Event::HrmpChannelForceOpened {
				sender,
				recipient,
				proposed_max_capacity: max_capacity,
				proposed_max_message_size: max_message_size,
			});
			Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
		}
		#[pallet::call_index(8)]
		#[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
		pub fn establish_system_channel(
			origin: OriginFor<T>,
			sender: ParaId,
			recipient: ParaId,
		) -> DispatchResultWithPostInfo {
			let _caller = ensure_signed(origin)?;
			ensure!(
				sender.is_system() && recipient.is_system(),
				Error::<T>::ChannelCreationNotAuthorized
			);
			let config = configuration::ActiveConfig::<T>::get();
			let max_message_size = config.hrmp_channel_max_message_size;
			let max_capacity = config.hrmp_channel_max_capacity;
			Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
			Self::accept_open_channel(recipient, sender)?;
			Self::deposit_event(Event::HrmpSystemChannelOpened {
				sender,
				recipient,
				proposed_max_capacity: max_capacity,
				proposed_max_message_size: max_message_size,
			});
			Ok(Pays::No.into())
		}
		#[pallet::call_index(9)]
		#[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
		pub fn poke_channel_deposits(
			origin: OriginFor<T>,
			sender: ParaId,
			recipient: ParaId,
		) -> DispatchResult {
			let _caller = ensure_signed(origin)?;
			let channel_id = HrmpChannelId { sender, recipient };
			let is_system = sender.is_system() || recipient.is_system();
			let config = configuration::ActiveConfig::<T>::get();
			let (new_sender_deposit, new_recipient_deposit) = if is_system {
				(0, 0)
			} else {
				(config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
			};
			let _ = HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
				if let Some(ref mut channel) = channel {
					let current_sender_deposit = channel.sender_deposit;
					let current_recipient_deposit = channel.recipient_deposit;
					if current_sender_deposit == new_sender_deposit &&
						current_recipient_deposit == new_recipient_deposit
					{
						return Ok(())
					}
					if current_sender_deposit > new_sender_deposit {
						let amount = current_sender_deposit
							.checked_sub(new_sender_deposit)
							.ok_or(ArithmeticError::Underflow)?;
						T::Currency::unreserve(
							&channel_id.sender.into_account_truncating(),
							amount.try_into().unwrap_or(Zero::zero()),
						);
					} else if current_sender_deposit < new_sender_deposit {
						let amount = new_sender_deposit
							.checked_sub(current_sender_deposit)
							.ok_or(ArithmeticError::Underflow)?;
						T::Currency::reserve(
							&channel_id.sender.into_account_truncating(),
							amount.try_into().unwrap_or(Zero::zero()),
						)?;
					}
					if current_recipient_deposit > new_recipient_deposit {
						let amount = current_recipient_deposit
							.checked_sub(new_recipient_deposit)
							.ok_or(ArithmeticError::Underflow)?;
						T::Currency::unreserve(
							&channel_id.recipient.into_account_truncating(),
							amount.try_into().unwrap_or(Zero::zero()),
						);
					} else if current_recipient_deposit < new_recipient_deposit {
						let amount = new_recipient_deposit
							.checked_sub(current_recipient_deposit)
							.ok_or(ArithmeticError::Underflow)?;
						T::Currency::reserve(
							&channel_id.recipient.into_account_truncating(),
							amount.try_into().unwrap_or(Zero::zero()),
						)?;
					}
					channel.sender_deposit = new_sender_deposit;
					channel.recipient_deposit = new_recipient_deposit;
				} else {
					return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
				}
				Ok(())
			})?;
			Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
			Ok(())
		}
		#[pallet::call_index(10)]
		#[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
		pub fn establish_channel_with_system(
			origin: OriginFor<T>,
			target_system_chain: ParaId,
		) -> DispatchResultWithPostInfo {
			let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
			ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
			let (max_message_size, max_capacity) =
				T::DefaultChannelSizeAndCapacityWithSystem::get();
			Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
			Self::accept_open_channel(target_system_chain, sender)?;
			Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
			Self::accept_open_channel(sender, target_system_chain)?;
			Self::deposit_event(Event::HrmpSystemChannelOpened {
				sender,
				recipient: target_system_chain,
				proposed_max_capacity: max_capacity,
				proposed_max_message_size: max_message_size,
			});
			Self::deposit_event(Event::HrmpSystemChannelOpened {
				sender: target_system_chain,
				recipient: sender,
				proposed_max_capacity: max_capacity,
				proposed_max_message_size: max_message_size,
			});
			Ok(Pays::No.into())
		}
	}
}
fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
	let host_config = configuration::ActiveConfig::<T>::get();
	for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
		if let Err(err) =
			preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
		{
			panic!("failed to initialize the genesis storage: {:?}", err);
		}
	}
	Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
}
fn preopen_hrmp_channel<T: Config>(
	sender: ParaId,
	recipient: ParaId,
	max_capacity: u32,
	max_message_size: u32,
) -> DispatchResult {
	Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
	Pallet::<T>::accept_open_channel(recipient, sender)?;
	Ok(())
}
impl<T: Config> Pallet<T> {
	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
		Weight::zero()
	}
	pub(crate) fn initializer_finalize() {}
	pub(crate) fn initializer_on_new_session(
		notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
		outgoing_paras: &[ParaId],
	) -> Weight {
		let w1 = Self::perform_outgoing_para_cleanup(¬ification.prev_config, outgoing_paras);
		Self::process_hrmp_open_channel_requests(¬ification.prev_config);
		Self::process_hrmp_close_channel_requests();
		w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
			outgoing_paras.len() as u32
		))
		.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
			outgoing_paras.len() as u32
		))
	}
	fn perform_outgoing_para_cleanup(
		config: &HostConfiguration<BlockNumberFor<T>>,
		outgoing: &[ParaId],
	) -> Weight {
		let mut w = Self::clean_open_channel_requests(config, outgoing);
		for outgoing_para in outgoing {
			Self::clean_hrmp_after_outgoing(outgoing_para);
			let ingress_count =
				HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
			let egress_count =
				HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
			w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
				ingress_count,
				egress_count,
			));
		}
		w
	}
	pub(crate) fn clean_open_channel_requests(
		config: &HostConfiguration<BlockNumberFor<T>>,
		outgoing: &[ParaId],
	) -> Weight {
		let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
		let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
			.into_iter()
			.partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
		HrmpOpenChannelRequestsList::<T>::put(stay);
		for req_id in go {
			let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
				Some(req_data) => req_data,
				None => {
					continue
				},
			};
			if !outgoing.contains(&req_id.sender) {
				T::Currency::unreserve(
					&req_id.sender.into_account_truncating(),
					req_data.sender_deposit.unique_saturated_into(),
				);
			}
			if req_data.confirmed {
				if !outgoing.contains(&req_id.recipient) {
					T::Currency::unreserve(
						&req_id.recipient.into_account_truncating(),
						config.hrmp_recipient_deposit.unique_saturated_into(),
					);
				}
				Self::decrease_accepted_channel_request_count(req_id.recipient);
			}
		}
		<T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
	}
	fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
		HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
		HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
		let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
			.into_iter()
			.map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
		let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
			.into_iter()
			.map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
		let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
		to_close.sort();
		to_close.dedup();
		for channel in to_close {
			Self::close_hrmp_channel(&channel);
		}
	}
	fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
		let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
		if open_req_channels.is_empty() {
			return
		}
		let mut idx = open_req_channels.len();
		loop {
			if idx == 0 {
				break
			}
			idx -= 1;
			let channel_id = open_req_channels[idx].clone();
			let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
				"can't be `None` due to the invariant that the list contains the same items as the set; qed",
			);
			let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
			let sender_deposit = request.sender_deposit;
			let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
			if request.confirmed {
				if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
					paras::Pallet::<T>::is_valid_para(channel_id.recipient)
				{
					HrmpChannels::<T>::insert(
						&channel_id,
						HrmpChannel {
							sender_deposit,
							recipient_deposit,
							max_capacity: request.max_capacity,
							max_total_size: request.max_total_size,
							max_message_size: request.max_message_size,
							msg_count: 0,
							total_size: 0,
							mqc_head: None,
						},
					);
					HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
						if let Err(i) = v.binary_search(&channel_id.sender) {
							v.insert(i, channel_id.sender);
						}
					});
					HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
						if let Err(i) = v.binary_search(&channel_id.recipient) {
							v.insert(i, channel_id.recipient);
						}
					});
				}
				Self::decrease_open_channel_request_count(channel_id.sender);
				Self::decrease_accepted_channel_request_count(channel_id.recipient);
				let _ = open_req_channels.swap_remove(idx);
				HrmpOpenChannelRequests::<T>::remove(&channel_id);
			}
		}
		HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
	}
	fn process_hrmp_close_channel_requests() {
		let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
		for condemned_ch_id in close_reqs {
			HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
			Self::close_hrmp_channel(&condemned_ch_id);
		}
	}
	fn close_hrmp_channel(channel_id: &HrmpChannelId) {
		if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
			HrmpChannels::<T>::take(channel_id)
		{
			T::Currency::unreserve(
				&channel_id.sender.into_account_truncating(),
				sender_deposit.unique_saturated_into(),
			);
			T::Currency::unreserve(
				&channel_id.recipient.into_account_truncating(),
				recipient_deposit.unique_saturated_into(),
			);
		}
		HrmpChannelContents::<T>::remove(channel_id);
		HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
			if let Ok(i) = v.binary_search(&channel_id.recipient) {
				v.remove(i);
			}
		});
		HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
			if let Ok(i) = v.binary_search(&channel_id.sender) {
				v.remove(i);
			}
		});
	}
	pub(crate) fn check_hrmp_watermark(
		recipient: ParaId,
		relay_chain_parent_number: BlockNumberFor<T>,
		new_hrmp_watermark: BlockNumberFor<T>,
	) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
		if new_hrmp_watermark == relay_chain_parent_number {
			return Ok(())
		}
		if new_hrmp_watermark > relay_chain_parent_number {
			return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
				new_watermark: new_hrmp_watermark,
				relay_chain_parent_number,
			})
		}
		if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
			if new_hrmp_watermark <= last_watermark {
				return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
					new_watermark: new_hrmp_watermark,
					last_watermark,
				})
			}
		}
		let digest = HrmpChannelDigests::<T>::get(&recipient);
		if !digest
			.binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
			.is_ok()
		{
			return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
				new_watermark: new_hrmp_watermark,
			})
		}
		Ok(())
	}
	pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
		HrmpChannelDigests::<T>::get(&recipient)
			.into_iter()
			.map(|(block_no, _)| block_no)
			.collect()
	}
	pub(crate) fn check_outbound_hrmp(
		config: &HostConfiguration<BlockNumberFor<T>>,
		sender: ParaId,
		out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
	) -> Result<(), OutboundHrmpAcceptanceErr> {
		if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
			return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
				sent: out_hrmp_msgs.len() as u32,
				permitted: config.hrmp_max_message_num_per_candidate,
			})
		}
		let mut last_recipient = None::<ParaId>;
		for (idx, out_msg) in
			out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
		{
			match last_recipient {
				Some(last_recipient) if out_msg.recipient <= last_recipient =>
					return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
				_ => last_recipient = Some(out_msg.recipient),
			}
			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
			let channel = match HrmpChannels::<T>::get(&channel_id) {
				Some(channel) => channel,
				None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
			};
			let msg_size = out_msg.data.len() as u32;
			if msg_size > channel.max_message_size {
				return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
					idx,
					msg_size,
					max_size: channel.max_message_size,
				})
			}
			let new_total_size = channel.total_size + out_msg.data.len() as u32;
			if new_total_size > channel.max_total_size {
				return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
					idx,
					total_size: new_total_size,
					limit: channel.max_total_size,
				})
			}
			let new_msg_count = channel.msg_count + 1;
			if new_msg_count > channel.max_capacity {
				return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
					idx,
					count: new_msg_count,
					limit: channel.max_capacity,
				})
			}
		}
		Ok(())
	}
	pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
		let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
		let mut remaining = Vec::with_capacity(recipients.len());
		for recipient in recipients {
			let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
				continue
			};
			remaining.push((
				recipient,
				(
					channel.max_capacity - channel.msg_count,
					channel.max_total_size - channel.total_size,
				),
			));
		}
		remaining
	}
	pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) -> Weight {
		let mut weight = Weight::zero();
		let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
			let mut senders = BTreeSet::new();
			let mut leftover = Vec::with_capacity(digest.len());
			for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
				if block_no <= new_hrmp_watermark {
					senders.extend(paras_sent_msg);
				} else {
					leftover.push((block_no, paras_sent_msg));
				}
			}
			*digest = leftover;
			senders
		});
		weight += T::DbWeight::get().reads_writes(1, 1);
		let channels_to_prune =
			senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
		for channel_id in channels_to_prune {
			let (mut pruned_cnt, mut pruned_size) = (0, 0);
			let contents = HrmpChannelContents::<T>::get(&channel_id);
			let mut leftover = Vec::with_capacity(contents.len());
			for msg in contents {
				if msg.sent_at <= new_hrmp_watermark {
					pruned_cnt += 1;
					pruned_size += msg.data.len();
				} else {
					leftover.push(msg);
				}
			}
			if !leftover.is_empty() {
				HrmpChannelContents::<T>::insert(&channel_id, leftover);
			} else {
				HrmpChannelContents::<T>::remove(&channel_id);
			}
			HrmpChannels::<T>::mutate(&channel_id, |channel| {
				if let Some(ref mut channel) = channel {
					channel.msg_count -= pruned_cnt as u32;
					channel.total_size -= pruned_size as u32;
				}
			});
			weight += T::DbWeight::get().reads_writes(2, 2);
		}
		HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
		weight += T::DbWeight::get().reads_writes(0, 1);
		weight
	}
	pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) -> Weight {
		let mut weight = Weight::zero();
		let now = frame_system::Pallet::<T>::block_number();
		for out_msg in out_hrmp_msgs {
			let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
			let mut channel = match HrmpChannels::<T>::get(&channel_id) {
				Some(channel) => channel,
				None => {
					continue
				},
			};
			let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
			channel.msg_count += 1;
			channel.total_size += inbound.data.len() as u32;
			let prev_head = channel.mqc_head.unwrap_or(Default::default());
			let new_head = BlakeTwo256::hash_of(&(
				prev_head,
				inbound.sent_at,
				T::Hashing::hash_of(&inbound.data),
			));
			channel.mqc_head = Some(new_head);
			HrmpChannels::<T>::insert(&channel_id, channel);
			HrmpChannelContents::<T>::append(&channel_id, inbound);
			let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
			if let Some(cur_block_digest) = recipient_digest
				.last_mut()
				.filter(|(block_no, _)| *block_no == now)
				.map(|(_, ref mut d)| d)
			{
				cur_block_digest.push(sender);
			} else {
				recipient_digest.push((now, vec![sender]));
			}
			HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
			weight += T::DbWeight::get().reads_writes(2, 2);
		}
		weight
	}
	pub fn init_open_channel(
		origin: ParaId,
		recipient: ParaId,
		proposed_max_capacity: u32,
		proposed_max_message_size: u32,
	) -> DispatchResult {
		ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
		ensure!(
			paras::Pallet::<T>::is_valid_para(recipient),
			Error::<T>::OpenHrmpChannelInvalidRecipient,
		);
		let config = configuration::ActiveConfig::<T>::get();
		ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
		ensure!(
			proposed_max_capacity <= config.hrmp_channel_max_capacity,
			Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
		);
		ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
		ensure!(
			proposed_max_message_size <= config.hrmp_channel_max_message_size,
			Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
		);
		let channel_id = HrmpChannelId { sender: origin, recipient };
		ensure!(
			HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
			Error::<T>::OpenHrmpChannelAlreadyRequested,
		);
		ensure!(
			HrmpChannels::<T>::get(&channel_id).is_none(),
			Error::<T>::OpenHrmpChannelAlreadyExists,
		);
		let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
		let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
		let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
		ensure!(
			egress_cnt + open_req_cnt < channel_num_limit,
			Error::<T>::OpenHrmpChannelLimitExceeded,
		);
		let is_system = origin.is_system() || recipient.is_system();
		let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
		if !deposit.is_zero() {
			T::Currency::reserve(
				&origin.into_account_truncating(),
				deposit.unique_saturated_into(),
			)?;
		}
		HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
		HrmpOpenChannelRequests::<T>::insert(
			&channel_id,
			HrmpOpenChannelRequest {
				confirmed: false,
				_age: 0,
				sender_deposit: deposit,
				max_capacity: proposed_max_capacity,
				max_message_size: proposed_max_message_size,
				max_total_size: config.hrmp_channel_max_total_size,
			},
		);
		HrmpOpenChannelRequestsList::<T>::append(channel_id);
		let notification_bytes = {
			use parity_scale_codec::Encode as _;
			use xcm::opaque::{latest::prelude::*, VersionedXcm};
			VersionedXcm::from(Xcm(vec![HrmpNewChannelOpenRequest {
				sender: u32::from(origin),
				max_capacity: proposed_max_capacity,
				max_message_size: proposed_max_message_size,
			}]))
			.encode()
		};
		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
			dmp::Pallet::<T>::queue_downward_message(&config, recipient, notification_bytes)
		{
			log::error!(
				target: "runtime::hrmp",
				"sending 'init_open_channel::notification_bytes' failed."
			);
			debug_assert!(false);
		}
		Ok(())
	}
	pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
		let channel_id = HrmpChannelId { sender, recipient: origin };
		let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
			.ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
		ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
		let config = configuration::ActiveConfig::<T>::get();
		let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
		let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
		let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
		ensure!(
			ingress_cnt + accepted_cnt < channel_num_limit,
			Error::<T>::AcceptHrmpChannelLimitExceeded,
		);
		let is_system = origin.is_system() || sender.is_system();
		let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
		if !deposit.is_zero() {
			T::Currency::reserve(
				&origin.into_account_truncating(),
				deposit.unique_saturated_into(),
			)?;
		}
		channel_req.confirmed = true;
		HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
		HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
		let notification_bytes = {
			use parity_scale_codec::Encode as _;
			use xcm::opaque::{latest::prelude::*, VersionedXcm};
			let xcm = Xcm(vec![HrmpChannelAccepted { recipient: u32::from(origin) }]);
			VersionedXcm::from(xcm).encode()
		};
		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
			dmp::Pallet::<T>::queue_downward_message(&config, sender, notification_bytes)
		{
			log::error!(
				target: "runtime::hrmp",
				"sending 'accept_open_channel::notification_bytes' failed."
			);
			debug_assert!(false);
		}
		Ok(())
	}
	fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
		ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
		let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
			.ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
		ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
		HrmpOpenChannelRequests::<T>::remove(&channel_id);
		HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
			if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
				open_req_channels.swap_remove(pos);
			}
		});
		Self::decrease_open_channel_request_count(channel_id.sender);
		T::Currency::unreserve(
			&channel_id.sender.into_account_truncating(),
			open_channel_req.sender_deposit.unique_saturated_into(),
		);
		Ok(())
	}
	fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
		ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
		ensure!(
			HrmpChannels::<T>::get(&channel_id).is_some(),
			Error::<T>::CloseHrmpChannelDoesntExist,
		);
		ensure!(
			HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
			Error::<T>::CloseHrmpChannelAlreadyUnderway,
		);
		HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
		HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
		let config = configuration::ActiveConfig::<T>::get();
		let notification_bytes = {
			use parity_scale_codec::Encode as _;
			use xcm::opaque::{latest::prelude::*, VersionedXcm};
			VersionedXcm::from(Xcm(vec![HrmpChannelClosing {
				initiator: u32::from(origin),
				sender: u32::from(channel_id.sender),
				recipient: u32::from(channel_id.recipient),
			}]))
			.encode()
		};
		let opposite_party =
			if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
		if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
			dmp::Pallet::<T>::queue_downward_message(&config, opposite_party, notification_bytes)
		{
			log::error!(
				target: "runtime::hrmp",
				"sending 'close_channel::notification_bytes' failed."
			);
			debug_assert!(false);
		}
		Ok(())
	}
	#[cfg(test)]
	fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
		let mut mqc_heads = Vec::with_capacity(sender_set.len());
		for sender in sender_set {
			let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
			let mqc_head = channel_metadata
				.and_then(|metadata| metadata.mqc_head)
				.unwrap_or(Hash::default());
			mqc_heads.push((sender, mqc_head));
		}
		mqc_heads
	}
	pub(crate) fn inbound_hrmp_channels_contents(
		recipient: ParaId,
	) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
		let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
		let mut inbound_hrmp_channels_contents = BTreeMap::new();
		for sender in sender_set {
			let channel_contents =
				HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
			inbound_hrmp_channels_contents.insert(sender, channel_contents);
		}
		inbound_hrmp_channels_contents
	}
}
impl<T: Config> Pallet<T> {
	fn decrease_open_channel_request_count(sender: ParaId) {
		HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
				0 => None,
				n => Some(n),
			});
		});
	}
	fn decrease_accepted_channel_request_count(recipient: ParaId) {
		HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
			*opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
				0 => None,
				n => Some(n),
			});
		});
	}
	#[cfg(any(feature = "runtime-benchmarks", test))]
	fn assert_storage_consistency_exhaustive() {
		fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
			assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
		}
		let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
			for para in paras {
				assert!(
					crate::paras::Pallet::<T>::is_valid_para(para),
					"{}: {:?} para is offboarded",
					cause,
					para
				);
			}
		};
		assert_eq!(
			HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
			HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
		);
		assert_eq!(
			HrmpOpenChannelRequestCount::<T>::iter()
				.map(|(k, _)| k)
				.collect::<BTreeSet<_>>(),
			HrmpOpenChannelRequests::<T>::iter()
				.map(|(k, _)| k.sender)
				.collect::<BTreeSet<_>>(),
		);
		for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
			let actual_num = HrmpOpenChannelRequests::<T>::iter()
				.filter(|(ch, _)| ch.sender == open_channel_initiator)
				.count() as u32;
			assert_eq!(expected_num, actual_num);
		}
		assert_eq!(
			HrmpAcceptedChannelRequestCount::<T>::iter()
				.map(|(k, _)| k)
				.collect::<BTreeSet<_>>(),
			HrmpOpenChannelRequests::<T>::iter()
				.filter(|(_, v)| v.confirmed)
				.map(|(k, _)| k.recipient)
				.collect::<BTreeSet<_>>(),
		);
		for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
			let actual_num = HrmpOpenChannelRequests::<T>::iter()
				.filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
				.count() as u32;
			assert_eq!(expected_num, actual_num);
		}
		assert_eq!(
			HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
			HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
		);
		assert_contains_only_onboarded(
			HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
			"HRMP watermarks should contain only onboarded paras",
		);
		for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
			assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
			assert!(!contents.is_empty());
		}
		assert_contains_only_onboarded(
			HrmpChannels::<T>::iter()
				.flat_map(|(k, _)| vec![k.sender, k.recipient])
				.collect::<Vec<_>>(),
			"senders and recipients in all channels should be onboarded",
		);
		let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
			.flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
			.collect::<BTreeSet<_>>();
		let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
			.flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
			.collect::<BTreeSet<_>>();
		let channel_set_ground_truth = HrmpChannels::<T>::iter()
			.map(|(k, _)| (k.sender, k.recipient))
			.collect::<BTreeSet<_>>();
		assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
		assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
		HrmpIngressChannelsIndex::<T>::iter()
			.map(|(_, v)| v)
			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
		HrmpEgressChannelsIndex::<T>::iter()
			.map(|(_, v)| v)
			.for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
		assert_contains_only_onboarded(
			HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
			"HRMP channel digests should contain only onboarded paras",
		);
		for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
			assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
			for (_, mut senders) in digest {
				assert!(!senders.is_empty());
				senders.sort();
				let orig_senders = senders.clone();
				senders.dedup();
				assert_eq!(
					orig_senders, senders,
					"duplicates removed implies existence of duplicates"
				);
			}
		}
	}
}