1use crate::{
18 configuration::{self, HostConfiguration},
19 dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22 collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23 vec,
24 vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32 Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33 SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37 traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38 ArithmeticError,
39};
40
41pub use pallet::*;
42
43pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58 fn hrmp_init_open_channel() -> Weight;
59 fn hrmp_accept_open_channel() -> Weight;
60 fn hrmp_close_channel() -> Weight;
61 fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62 fn force_process_hrmp_open(c: u32) -> Weight;
63 fn force_process_hrmp_close(c: u32) -> Weight;
64 fn hrmp_cancel_open_request(c: u32) -> Weight;
65 fn clean_open_channel_requests(c: u32) -> Weight;
66 fn force_open_hrmp_channel(c: u32) -> Weight;
67 fn establish_system_channel() -> Weight;
68 fn poke_channel_deposits() -> Weight;
69 fn establish_channel_with_system() -> Weight;
70}
71
72pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76 fn hrmp_accept_open_channel() -> Weight {
77 Weight::MAX
78 }
79 fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80 Weight::MAX
81 }
82 fn force_process_hrmp_close(_: u32) -> Weight {
83 Weight::MAX
84 }
85 fn force_process_hrmp_open(_: u32) -> Weight {
86 Weight::MAX
87 }
88 fn hrmp_cancel_open_request(_: u32) -> Weight {
89 Weight::MAX
90 }
91 fn hrmp_close_channel() -> Weight {
92 Weight::MAX
93 }
94 fn hrmp_init_open_channel() -> Weight {
95 Weight::MAX
96 }
97 fn clean_open_channel_requests(_: u32) -> Weight {
98 Weight::MAX
99 }
100 fn force_open_hrmp_channel(_: u32) -> Weight {
101 Weight::MAX
102 }
103 fn establish_system_channel() -> Weight {
104 Weight::MAX
105 }
106 fn poke_channel_deposits() -> Weight {
107 Weight::MAX
108 }
109 fn establish_channel_with_system() -> Weight {
110 Weight::MAX
111 }
112}
113
114#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117 pub confirmed: bool,
119 pub _age: SessionIndex,
122 pub sender_deposit: Balance,
124 pub max_message_size: u32,
126 pub max_capacity: u32,
128 pub max_total_size: u32,
130}
131
132#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136 pub max_capacity: u32,
144 pub max_total_size: u32,
146 pub max_message_size: u32,
148 pub msg_count: u32,
151 pub total_size: u32,
154 pub mqc_head: Option<Hash>,
162 pub sender_deposit: Balance,
164 pub recipient_deposit: Balance,
166}
167
168pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171 AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172 AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173 LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176pub(crate) enum OutboundHrmpAcceptanceErr {
179 MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180 NotSorted { idx: u32 },
181 NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182 MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183 TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184 CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189 BlockNumber: fmt::Debug,
190{
191 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192 use HrmpWatermarkAcceptanceErr::*;
193 match self {
194 AdvancementRule { new_watermark, last_watermark } => write!(
195 fmt,
196 "the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197 new_watermark, last_watermark,
198 ),
199 AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200 fmt,
201 "the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202 new_watermark, relay_chain_parent_number
203 ),
204 LandsOnBlockWithNoMessages { new_watermark } => write!(
205 fmt,
206 "the HRMP watermark ({:?}) doesn't land on a block with messages received",
207 new_watermark
208 ),
209 }
210 }
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215 use OutboundHrmpAcceptanceErr::*;
216 match self {
217 MoreMessagesThanPermitted { sent, permitted } => write!(
218 fmt,
219 "more HRMP messages than permitted by config ({} > {})",
220 sent, permitted,
221 ),
222 NotSorted { idx } => {
223 write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224 },
225 NoSuchChannel { idx, channel_id } => write!(
226 fmt,
227 "the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228 idx, channel_id.sender, channel_id.recipient,
229 ),
230 MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231 fmt,
232 "the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233 idx, msg_size, max_size,
234 ),
235 TotalSizeExceeded { idx, total_size, limit } => write!(
236 fmt,
237 "sending the HRMP message at index {} would exceed the negotiated channel total size ({} > {})",
238 idx, total_size, limit,
239 ),
240 CapacityExceeded { idx, count, limit } => write!(
241 fmt,
242 "sending the HRMP message at index {} would exceed the negotiated channel capacity ({} > {})",
243 idx, count, limit,
244 ),
245 }
246 }
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251 use super::*;
252
253 #[pallet::pallet]
254 #[pallet::without_storage_info]
255 pub struct Pallet<T>(_);
256
257 #[pallet::config]
258 pub trait Config:
259 frame_system::Config + configuration::Config + paras::Config + dmp::Config
260 {
261 #[allow(deprecated)]
263 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
264
265 type RuntimeOrigin: From<crate::Origin>
266 + From<<Self as frame_system::Config>::RuntimeOrigin>
267 + Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
268
269 type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
271
272 type Currency: ReservableCurrency<Self::AccountId>;
278
279 type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
282
283 type VersionWrapper: xcm::WrapVersion;
290
291 type WeightInfo: WeightInfo;
293 }
294
295 #[pallet::event]
296 #[pallet::generate_deposit(pub(super) fn deposit_event)]
297 pub enum Event<T: Config> {
298 OpenChannelRequested {
300 sender: ParaId,
301 recipient: ParaId,
302 proposed_max_capacity: u32,
303 proposed_max_message_size: u32,
304 },
305 OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
307 OpenChannelAccepted { sender: ParaId, recipient: ParaId },
309 ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
311 HrmpChannelForceOpened {
313 sender: ParaId,
314 recipient: ParaId,
315 proposed_max_capacity: u32,
316 proposed_max_message_size: u32,
317 },
318 HrmpSystemChannelOpened {
320 sender: ParaId,
321 recipient: ParaId,
322 proposed_max_capacity: u32,
323 proposed_max_message_size: u32,
324 },
325 OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
327 }
328
329 #[pallet::error]
330 pub enum Error<T> {
331 OpenHrmpChannelToSelf,
333 OpenHrmpChannelInvalidRecipient,
335 OpenHrmpChannelZeroCapacity,
337 OpenHrmpChannelCapacityExceedsLimit,
339 OpenHrmpChannelZeroMessageSize,
341 OpenHrmpChannelMessageSizeExceedsLimit,
343 OpenHrmpChannelAlreadyExists,
345 OpenHrmpChannelAlreadyRequested,
347 OpenHrmpChannelLimitExceeded,
349 AcceptHrmpChannelDoesntExist,
351 AcceptHrmpChannelAlreadyConfirmed,
353 AcceptHrmpChannelLimitExceeded,
355 CloseHrmpChannelUnauthorized,
357 CloseHrmpChannelDoesntExist,
359 CloseHrmpChannelAlreadyUnderway,
361 CancelHrmpOpenChannelUnauthorized,
363 OpenHrmpChannelDoesntExist,
365 OpenHrmpChannelAlreadyConfirmed,
367 WrongWitness,
369 ChannelCreationNotAuthorized,
371 }
372
373 #[pallet::storage]
380 pub type HrmpOpenChannelRequests<T: Config> =
381 StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
382
383 #[pallet::storage]
387 pub type HrmpOpenChannelRequestsList<T: Config> =
388 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
389
390 #[pallet::storage]
394 pub type HrmpOpenChannelRequestCount<T: Config> =
395 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
396
397 #[pallet::storage]
401 pub type HrmpAcceptedChannelRequestCount<T: Config> =
402 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
403
404 #[pallet::storage]
412 pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
413
414 #[pallet::storage]
415 pub type HrmpCloseChannelRequestsList<T: Config> =
416 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
417
418 #[pallet::storage]
423 pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
424
425 #[pallet::storage]
429 pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
430
431 #[pallet::storage]
445 pub type HrmpIngressChannelsIndex<T: Config> =
446 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
447
448 #[pallet::storage]
451 pub type HrmpEgressChannelsIndex<T: Config> =
452 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
453
454 #[pallet::storage]
457 pub type HrmpChannelContents<T: Config> = StorageMap<
458 _,
459 Twox64Concat,
460 HrmpChannelId,
461 Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
462 ValueQuery,
463 >;
464
465 #[pallet::storage]
472 pub type HrmpChannelDigests<T: Config> =
473 StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
474
475 #[pallet::genesis_config]
489 #[derive(DefaultNoBound)]
490 pub struct GenesisConfig<T: Config> {
491 #[serde(skip)]
492 _config: core::marker::PhantomData<T>,
493 preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
494 }
495
496 #[pallet::genesis_build]
497 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
498 fn build(&self) {
499 initialize_storage::<T>(&self.preopen_hrmp_channels);
500 }
501 }
502
503 #[pallet::call]
504 impl<T: Config> Pallet<T> {
505 #[pallet::call_index(0)]
516 #[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
517 pub fn hrmp_init_open_channel(
518 origin: OriginFor<T>,
519 recipient: ParaId,
520 proposed_max_capacity: u32,
521 proposed_max_message_size: u32,
522 ) -> DispatchResult {
523 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
524 Self::init_open_channel(
525 origin,
526 recipient,
527 proposed_max_capacity,
528 proposed_max_message_size,
529 )?;
530 Self::deposit_event(Event::OpenChannelRequested {
531 sender: origin,
532 recipient,
533 proposed_max_capacity,
534 proposed_max_message_size,
535 });
536 Ok(())
537 }
538
539 #[pallet::call_index(1)]
543 #[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
544 pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
545 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
546 Self::accept_open_channel(origin, sender)?;
547 Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
548 Ok(())
549 }
550
551 #[pallet::call_index(2)]
556 #[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
557 pub fn hrmp_close_channel(
558 origin: OriginFor<T>,
559 channel_id: HrmpChannelId,
560 ) -> DispatchResult {
561 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
562 Self::close_channel(origin, channel_id.clone())?;
563 Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
564 Ok(())
565 }
566
567 #[pallet::call_index(3)]
575 #[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
576 pub fn force_clean_hrmp(
577 origin: OriginFor<T>,
578 para: ParaId,
579 num_inbound: u32,
580 num_outbound: u32,
581 ) -> DispatchResult {
582 T::ChannelManager::ensure_origin(origin)?;
583
584 ensure!(
585 HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
586 num_inbound as usize,
587 Error::<T>::WrongWitness
588 );
589 ensure!(
590 HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
591 num_outbound as usize,
592 Error::<T>::WrongWitness
593 );
594
595 Self::clean_hrmp_after_outgoing(¶);
596 Ok(())
597 }
598
599 #[pallet::call_index(4)]
608 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
609 pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
610 T::ChannelManager::ensure_origin(origin)?;
611
612 ensure!(
613 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
614 channels,
615 Error::<T>::WrongWitness
616 );
617
618 let host_config = configuration::ActiveConfig::<T>::get();
619 Self::process_hrmp_open_channel_requests(&host_config);
620 Ok(())
621 }
622
623 #[pallet::call_index(5)]
632 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
633 pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
634 T::ChannelManager::ensure_origin(origin)?;
635
636 ensure!(
637 HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
638 channels,
639 Error::<T>::WrongWitness
640 );
641
642 Self::process_hrmp_close_channel_requests();
643 Ok(())
644 }
645
646 #[pallet::call_index(6)]
655 #[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
656 pub fn hrmp_cancel_open_request(
657 origin: OriginFor<T>,
658 channel_id: HrmpChannelId,
659 open_requests: u32,
660 ) -> DispatchResult {
661 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
662 ensure!(
663 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
664 open_requests,
665 Error::<T>::WrongWitness
666 );
667 Self::cancel_open_request(origin, channel_id.clone())?;
668 Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
669 Ok(())
670 }
671
672 #[pallet::call_index(7)]
681 #[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
682 pub fn force_open_hrmp_channel(
683 origin: OriginFor<T>,
684 sender: ParaId,
685 recipient: ParaId,
686 max_capacity: u32,
687 max_message_size: u32,
688 ) -> DispatchResultWithPostInfo {
689 T::ChannelManager::ensure_origin(origin)?;
690
691 let channel_id = HrmpChannelId { sender, recipient };
696 let cancel_request: u32 =
697 if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
698 Self::cancel_open_request(sender, channel_id)?;
699 1
700 } else {
701 0
702 };
703
704 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
707 Self::accept_open_channel(recipient, sender)?;
708 Self::deposit_event(Event::HrmpChannelForceOpened {
709 sender,
710 recipient,
711 proposed_max_capacity: max_capacity,
712 proposed_max_message_size: max_message_size,
713 });
714
715 Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
716 }
717
718 #[pallet::call_index(8)]
731 #[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
732 pub fn establish_system_channel(
733 origin: OriginFor<T>,
734 sender: ParaId,
735 recipient: ParaId,
736 ) -> DispatchResultWithPostInfo {
737 let _caller = ensure_signed(origin)?;
738
739 ensure!(
741 sender.is_system() && recipient.is_system(),
742 Error::<T>::ChannelCreationNotAuthorized
743 );
744
745 let config = configuration::ActiveConfig::<T>::get();
746 let max_message_size = config.hrmp_channel_max_message_size;
747 let max_capacity = config.hrmp_channel_max_capacity;
748
749 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
750 Self::accept_open_channel(recipient, sender)?;
751
752 Self::deposit_event(Event::HrmpSystemChannelOpened {
753 sender,
754 recipient,
755 proposed_max_capacity: max_capacity,
756 proposed_max_message_size: max_message_size,
757 });
758
759 Ok(Pays::No.into())
760 }
761
762 #[pallet::call_index(9)]
772 #[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
773 pub fn poke_channel_deposits(
774 origin: OriginFor<T>,
775 sender: ParaId,
776 recipient: ParaId,
777 ) -> DispatchResult {
778 let _caller = ensure_signed(origin)?;
779 let channel_id = HrmpChannelId { sender, recipient };
780 let is_system = sender.is_system() || recipient.is_system();
781
782 let config = configuration::ActiveConfig::<T>::get();
783
784 let (new_sender_deposit, new_recipient_deposit) = if is_system {
786 (0, 0)
787 } else {
788 (config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
789 };
790
791 HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
792 if let Some(ref mut channel) = channel {
793 let current_sender_deposit = channel.sender_deposit;
794 let current_recipient_deposit = channel.recipient_deposit;
795
796 if current_sender_deposit == new_sender_deposit &&
798 current_recipient_deposit == new_recipient_deposit
799 {
800 return Ok(())
801 }
802
803 if current_sender_deposit > new_sender_deposit {
805 let amount = current_sender_deposit
807 .checked_sub(new_sender_deposit)
808 .ok_or(ArithmeticError::Underflow)?;
809 T::Currency::unreserve(
810 &channel_id.sender.into_account_truncating(),
811 amount.try_into().unwrap_or(Zero::zero()),
814 );
815 } else if current_sender_deposit < new_sender_deposit {
816 let amount = new_sender_deposit
817 .checked_sub(current_sender_deposit)
818 .ok_or(ArithmeticError::Underflow)?;
819 T::Currency::reserve(
820 &channel_id.sender.into_account_truncating(),
821 amount.try_into().unwrap_or(Zero::zero()),
822 )?;
823 }
824
825 if current_recipient_deposit > new_recipient_deposit {
827 let amount = current_recipient_deposit
828 .checked_sub(new_recipient_deposit)
829 .ok_or(ArithmeticError::Underflow)?;
830 T::Currency::unreserve(
831 &channel_id.recipient.into_account_truncating(),
832 amount.try_into().unwrap_or(Zero::zero()),
833 );
834 } else if current_recipient_deposit < new_recipient_deposit {
835 let amount = new_recipient_deposit
836 .checked_sub(current_recipient_deposit)
837 .ok_or(ArithmeticError::Underflow)?;
838 T::Currency::reserve(
839 &channel_id.recipient.into_account_truncating(),
840 amount.try_into().unwrap_or(Zero::zero()),
841 )?;
842 }
843
844 channel.sender_deposit = new_sender_deposit;
846 channel.recipient_deposit = new_recipient_deposit;
847 } else {
848 return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
849 }
850 Ok(())
851 })?;
852
853 Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
854
855 Ok(())
856 }
857
858 #[pallet::call_index(10)]
866 #[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
867 pub fn establish_channel_with_system(
868 origin: OriginFor<T>,
869 target_system_chain: ParaId,
870 ) -> DispatchResultWithPostInfo {
871 let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
872
873 ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
874
875 let (max_message_size, max_capacity) =
876 T::DefaultChannelSizeAndCapacityWithSystem::get();
877
878 Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
880 Self::accept_open_channel(target_system_chain, sender)?;
881
882 Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
883 Self::accept_open_channel(sender, target_system_chain)?;
884
885 Self::deposit_event(Event::HrmpSystemChannelOpened {
886 sender,
887 recipient: target_system_chain,
888 proposed_max_capacity: max_capacity,
889 proposed_max_message_size: max_message_size,
890 });
891
892 Self::deposit_event(Event::HrmpSystemChannelOpened {
893 sender: target_system_chain,
894 recipient: sender,
895 proposed_max_capacity: max_capacity,
896 proposed_max_message_size: max_message_size,
897 });
898
899 Ok(Pays::No.into())
900 }
901 }
902}
903
904fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
905 let host_config = configuration::ActiveConfig::<T>::get();
906 for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
907 if let Err(err) =
908 preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
909 {
910 panic!("failed to initialize the genesis storage: {:?}", err);
911 }
912 }
913 Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
914}
915
916fn preopen_hrmp_channel<T: Config>(
917 sender: ParaId,
918 recipient: ParaId,
919 max_capacity: u32,
920 max_message_size: u32,
921) -> DispatchResult {
922 Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
923 Pallet::<T>::accept_open_channel(recipient, sender)?;
924 Ok(())
925}
926
927impl<T: Config> Pallet<T> {
929 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
931 Weight::zero()
932 }
933
934 pub(crate) fn initializer_finalize() {}
936
937 pub(crate) fn initializer_on_new_session(
939 notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
940 outgoing_paras: &[ParaId],
941 ) -> Weight {
942 let w1 = Self::perform_outgoing_para_cleanup(¬ification.prev_config, outgoing_paras);
943 Self::process_hrmp_open_channel_requests(¬ification.prev_config);
944 Self::process_hrmp_close_channel_requests();
945 w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
946 outgoing_paras.len() as u32
947 ))
948 .saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
949 outgoing_paras.len() as u32,
950 ))
951 }
952
953 fn perform_outgoing_para_cleanup(
956 config: &HostConfiguration<BlockNumberFor<T>>,
957 outgoing: &[ParaId],
958 ) -> Weight {
959 let mut w = Self::clean_open_channel_requests(config, outgoing);
960 for outgoing_para in outgoing {
961 Self::clean_hrmp_after_outgoing(outgoing_para);
962
963 let ingress_count =
966 HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
967 let egress_count =
968 HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
969 w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
970 ingress_count,
971 egress_count,
972 ));
973 }
974 w
975 }
976
977 pub(crate) fn clean_open_channel_requests(
981 config: &HostConfiguration<BlockNumberFor<T>>,
982 outgoing: &[ParaId],
983 ) -> Weight {
984 let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
990 let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
991 .into_iter()
992 .partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
993 HrmpOpenChannelRequestsList::<T>::put(stay);
994
995 for req_id in go {
998 let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
999 Some(req_data) => req_data,
1000 None => {
1001 continue
1003 },
1004 };
1005
1006 if !outgoing.contains(&req_id.sender) {
1008 T::Currency::unreserve(
1009 &req_id.sender.into_account_truncating(),
1010 req_data.sender_deposit.unique_saturated_into(),
1011 );
1012 }
1013
1014 if req_data.confirmed {
1020 if !outgoing.contains(&req_id.recipient) {
1021 T::Currency::unreserve(
1022 &req_id.recipient.into_account_truncating(),
1023 config.hrmp_recipient_deposit.unique_saturated_into(),
1024 );
1025 }
1026 Self::decrease_accepted_channel_request_count(req_id.recipient);
1027 }
1028 }
1029
1030 <T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1031 }
1032
1033 fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1035 HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1036 HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1037
1038 let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1039 .into_iter()
1040 .map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1041 let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1042 .into_iter()
1043 .map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1044 let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1045 to_close.sort();
1046 to_close.dedup();
1047
1048 for channel in to_close {
1049 Self::close_hrmp_channel(&channel);
1050 }
1051 }
1052
1053 fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1058 let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1059 if open_req_channels.is_empty() {
1060 return
1061 }
1062
1063 let mut idx = open_req_channels.len();
1066 loop {
1067 if idx == 0 {
1069 break
1070 }
1071
1072 idx -= 1;
1073 let channel_id = open_req_channels[idx].clone();
1074 let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1075 "can't be `None` due to the invariant that the list contains the same items as the set; qed",
1076 );
1077
1078 let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1079 let sender_deposit = request.sender_deposit;
1080 let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1081
1082 if request.confirmed {
1083 if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1084 paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1085 {
1086 HrmpChannels::<T>::insert(
1087 &channel_id,
1088 HrmpChannel {
1089 sender_deposit,
1090 recipient_deposit,
1091 max_capacity: request.max_capacity,
1092 max_total_size: request.max_total_size,
1093 max_message_size: request.max_message_size,
1094 msg_count: 0,
1095 total_size: 0,
1096 mqc_head: None,
1097 },
1098 );
1099
1100 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1101 if let Err(i) = v.binary_search(&channel_id.sender) {
1102 v.insert(i, channel_id.sender);
1103 }
1104 });
1105 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1106 if let Err(i) = v.binary_search(&channel_id.recipient) {
1107 v.insert(i, channel_id.recipient);
1108 }
1109 });
1110 }
1111
1112 Self::decrease_open_channel_request_count(channel_id.sender);
1113 Self::decrease_accepted_channel_request_count(channel_id.recipient);
1114
1115 let _ = open_req_channels.swap_remove(idx);
1116 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1117 }
1118 }
1119
1120 HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1121 }
1122
1123 fn process_hrmp_close_channel_requests() {
1125 let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1126 for condemned_ch_id in close_reqs {
1127 HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1128 Self::close_hrmp_channel(&condemned_ch_id);
1129 }
1130 }
1131
1132 fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1139 if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1140 HrmpChannels::<T>::take(channel_id)
1141 {
1142 T::Currency::unreserve(
1143 &channel_id.sender.into_account_truncating(),
1144 sender_deposit.unique_saturated_into(),
1145 );
1146 T::Currency::unreserve(
1147 &channel_id.recipient.into_account_truncating(),
1148 recipient_deposit.unique_saturated_into(),
1149 );
1150 }
1151
1152 HrmpChannelContents::<T>::remove(channel_id);
1153
1154 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1155 if let Ok(i) = v.binary_search(&channel_id.recipient) {
1156 v.remove(i);
1157 }
1158 });
1159 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1160 if let Ok(i) = v.binary_search(&channel_id.sender) {
1161 v.remove(i);
1162 }
1163 });
1164 }
1165
1166 pub(crate) fn check_hrmp_watermark(
1168 recipient: ParaId,
1169 relay_chain_parent_number: BlockNumberFor<T>,
1170 new_hrmp_watermark: BlockNumberFor<T>,
1171 ) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1172 if new_hrmp_watermark == relay_chain_parent_number {
1181 return Ok(())
1182 }
1183
1184 if new_hrmp_watermark > relay_chain_parent_number {
1185 return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1186 new_watermark: new_hrmp_watermark,
1187 relay_chain_parent_number,
1188 })
1189 }
1190
1191 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1192 if new_hrmp_watermark <= last_watermark {
1193 return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1194 new_watermark: new_hrmp_watermark,
1195 last_watermark,
1196 })
1197 }
1198 }
1199
1200 let digest = HrmpChannelDigests::<T>::get(&recipient);
1205 if !digest
1206 .binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1207 .is_ok()
1208 {
1209 return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1210 new_watermark: new_hrmp_watermark,
1211 })
1212 }
1213 Ok(())
1214 }
1215
1216 pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1218 HrmpChannelDigests::<T>::get(&recipient)
1219 .into_iter()
1220 .map(|(block_no, _)| block_no)
1221 .collect()
1222 }
1223
1224 pub(crate) fn check_outbound_hrmp(
1225 config: &HostConfiguration<BlockNumberFor<T>>,
1226 sender: ParaId,
1227 out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1228 ) -> Result<(), OutboundHrmpAcceptanceErr> {
1229 if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1230 return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1231 sent: out_hrmp_msgs.len() as u32,
1232 permitted: config.hrmp_max_message_num_per_candidate,
1233 })
1234 }
1235
1236 let mut last_recipient = None::<ParaId>;
1237
1238 for (idx, out_msg) in
1239 out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1240 {
1241 match last_recipient {
1242 Some(last_recipient) if out_msg.recipient <= last_recipient =>
1246 return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
1247 _ => last_recipient = Some(out_msg.recipient),
1248 }
1249
1250 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1251
1252 let channel = match HrmpChannels::<T>::get(&channel_id) {
1253 Some(channel) => channel,
1254 None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1255 };
1256
1257 let msg_size = out_msg.data.len() as u32;
1258 if msg_size > channel.max_message_size {
1259 return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1260 idx,
1261 msg_size,
1262 max_size: channel.max_message_size,
1263 })
1264 }
1265
1266 let new_total_size = channel.total_size + out_msg.data.len() as u32;
1267 if new_total_size > channel.max_total_size {
1268 return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1269 idx,
1270 total_size: new_total_size,
1271 limit: channel.max_total_size,
1272 })
1273 }
1274
1275 let new_msg_count = channel.msg_count + 1;
1276 if new_msg_count > channel.max_capacity {
1277 return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1278 idx,
1279 count: new_msg_count,
1280 limit: channel.max_capacity,
1281 })
1282 }
1283 }
1284
1285 Ok(())
1286 }
1287
1288 pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1290 let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1291 let mut remaining = Vec::with_capacity(recipients.len());
1292
1293 for recipient in recipients {
1294 let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1295 continue
1296 };
1297 remaining.push((
1298 recipient,
1299 (
1300 channel.max_capacity - channel.msg_count,
1301 channel.max_total_size - channel.total_size,
1302 ),
1303 ));
1304 }
1305
1306 remaining
1307 }
1308
1309 pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1310 let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1313 let mut senders = BTreeSet::new();
1314 let mut leftover = Vec::with_capacity(digest.len());
1315 for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1316 if block_no <= new_hrmp_watermark {
1317 senders.extend(paras_sent_msg);
1318 } else {
1319 leftover.push((block_no, paras_sent_msg));
1320 }
1321 }
1322 *digest = leftover;
1323 senders
1324 });
1325
1326 let channels_to_prune =
1328 senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1329 for channel_id in channels_to_prune {
1330 let (mut pruned_cnt, mut pruned_size) = (0, 0);
1333
1334 let contents = HrmpChannelContents::<T>::get(&channel_id);
1335 let mut leftover = Vec::with_capacity(contents.len());
1336 for msg in contents {
1337 if msg.sent_at <= new_hrmp_watermark {
1338 pruned_cnt += 1;
1339 pruned_size += msg.data.len();
1340 } else {
1341 leftover.push(msg);
1342 }
1343 }
1344 if !leftover.is_empty() {
1345 HrmpChannelContents::<T>::insert(&channel_id, leftover);
1346 } else {
1347 HrmpChannelContents::<T>::remove(&channel_id);
1348 }
1349
1350 HrmpChannels::<T>::mutate(&channel_id, |channel| {
1352 if let Some(ref mut channel) = channel {
1353 channel.msg_count -= pruned_cnt as u32;
1354 channel.total_size -= pruned_size as u32;
1355 }
1356 });
1357 }
1358
1359 HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1360 }
1361
1362 pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1364 let now = frame_system::Pallet::<T>::block_number();
1365
1366 for out_msg in out_hrmp_msgs {
1367 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1368
1369 let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1370 Some(channel) => channel,
1371 None => {
1372 continue
1375 },
1376 };
1377
1378 let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1379
1380 channel.msg_count += 1;
1382 channel.total_size += inbound.data.len() as u32;
1383
1384 let prev_head = channel.mqc_head.unwrap_or(Default::default());
1386 let new_head = BlakeTwo256::hash_of(&(
1387 prev_head,
1388 inbound.sent_at,
1389 T::Hashing::hash_of(&inbound.data),
1390 ));
1391 channel.mqc_head = Some(new_head);
1392
1393 HrmpChannels::<T>::insert(&channel_id, channel);
1394 HrmpChannelContents::<T>::append(&channel_id, inbound);
1395
1396 let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1409 if let Some(cur_block_digest) = recipient_digest
1410 .last_mut()
1411 .filter(|(block_no, _)| *block_no == now)
1412 .map(|(_, ref mut d)| d)
1413 {
1414 cur_block_digest.push(sender);
1415 } else {
1416 recipient_digest.push((now, vec![sender]));
1417 }
1418 HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1419 }
1420 }
1421
1422 pub fn init_open_channel(
1430 origin: ParaId,
1431 recipient: ParaId,
1432 proposed_max_capacity: u32,
1433 proposed_max_message_size: u32,
1434 ) -> DispatchResult {
1435 ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1436 ensure!(
1437 paras::Pallet::<T>::is_valid_para(recipient),
1438 Error::<T>::OpenHrmpChannelInvalidRecipient,
1439 );
1440
1441 let config = configuration::ActiveConfig::<T>::get();
1442 ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1443 ensure!(
1444 proposed_max_capacity <= config.hrmp_channel_max_capacity,
1445 Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1446 );
1447 ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1448 ensure!(
1449 proposed_max_message_size <= config.hrmp_channel_max_message_size,
1450 Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1451 );
1452
1453 let channel_id = HrmpChannelId { sender: origin, recipient };
1454 ensure!(
1455 HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1456 Error::<T>::OpenHrmpChannelAlreadyRequested,
1457 );
1458 ensure!(
1459 HrmpChannels::<T>::get(&channel_id).is_none(),
1460 Error::<T>::OpenHrmpChannelAlreadyExists,
1461 );
1462
1463 let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1464 let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1465 let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1466 ensure!(
1467 egress_cnt + open_req_cnt < channel_num_limit,
1468 Error::<T>::OpenHrmpChannelLimitExceeded,
1469 );
1470
1471 let is_system = origin.is_system() || recipient.is_system();
1473 let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1474 if !deposit.is_zero() {
1475 T::Currency::reserve(
1476 &origin.into_account_truncating(),
1477 deposit.unique_saturated_into(),
1478 )?;
1479 }
1480
1481 HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1484 HrmpOpenChannelRequests::<T>::insert(
1485 &channel_id,
1486 HrmpOpenChannelRequest {
1487 confirmed: false,
1488 _age: 0,
1489 sender_deposit: deposit,
1490 max_capacity: proposed_max_capacity,
1491 max_message_size: proposed_max_message_size,
1492 max_total_size: config.hrmp_channel_max_total_size,
1493 },
1494 );
1495 HrmpOpenChannelRequestsList::<T>::append(channel_id);
1496
1497 Self::send_to_para(
1498 "init_open_channel",
1499 &config,
1500 recipient,
1501 Self::wrap_notification(|| {
1502 use xcm::opaque::latest::{prelude::*, Xcm};
1503 Xcm(vec![HrmpNewChannelOpenRequest {
1504 sender: origin.into(),
1505 max_capacity: proposed_max_capacity,
1506 max_message_size: proposed_max_message_size,
1507 }])
1508 }),
1509 );
1510
1511 Ok(())
1512 }
1513
1514 pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1519 let channel_id = HrmpChannelId { sender, recipient: origin };
1520 let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1521 .ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1522 ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1523
1524 let config = configuration::ActiveConfig::<T>::get();
1527 let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1528 let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1529 let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1530 ensure!(
1531 ingress_cnt + accepted_cnt < channel_num_limit,
1532 Error::<T>::AcceptHrmpChannelLimitExceeded,
1533 );
1534
1535 let is_system = origin.is_system() || sender.is_system();
1537 let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1538 if !deposit.is_zero() {
1539 T::Currency::reserve(
1540 &origin.into_account_truncating(),
1541 deposit.unique_saturated_into(),
1542 )?;
1543 }
1544
1545 channel_req.confirmed = true;
1548 HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1549 HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1550
1551 Self::send_to_para(
1552 "accept_open_channel",
1553 &config,
1554 sender,
1555 Self::wrap_notification(|| {
1556 use xcm::opaque::latest::{prelude::*, Xcm};
1557 Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1558 }),
1559 );
1560
1561 Ok(())
1562 }
1563
1564 fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1565 ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1567
1568 let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1569 .ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1570 ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1571
1572 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1574 HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1575 if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1576 open_req_channels.swap_remove(pos);
1577 }
1578 });
1579
1580 Self::decrease_open_channel_request_count(channel_id.sender);
1581 T::Currency::unreserve(
1587 &channel_id.sender.into_account_truncating(),
1588 open_channel_req.sender_deposit.unique_saturated_into(),
1589 );
1590
1591 Ok(())
1592 }
1593
1594 fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1595 ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1597
1598 ensure!(
1600 HrmpChannels::<T>::get(&channel_id).is_some(),
1601 Error::<T>::CloseHrmpChannelDoesntExist,
1602 );
1603
1604 ensure!(
1606 HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1607 Error::<T>::CloseHrmpChannelAlreadyUnderway,
1608 );
1609
1610 HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1611 HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1612
1613 let config = configuration::ActiveConfig::<T>::get();
1614 let opposite_party =
1615 if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1616
1617 Self::send_to_para(
1618 "close_channel",
1619 &config,
1620 opposite_party,
1621 Self::wrap_notification(|| {
1622 use xcm::opaque::latest::{prelude::*, Xcm};
1623 Xcm(vec![HrmpChannelClosing {
1624 initiator: origin.into(),
1625 sender: channel_id.sender.into(),
1626 recipient: channel_id.recipient.into(),
1627 }])
1628 }),
1629 );
1630
1631 Ok(())
1632 }
1633
1634 #[cfg(test)]
1638 fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1639 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1640
1641 let mut mqc_heads = Vec::with_capacity(sender_set.len());
1643 for sender in sender_set {
1644 let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1645 let mqc_head = channel_metadata
1646 .and_then(|metadata| metadata.mqc_head)
1647 .unwrap_or(Hash::default());
1648 mqc_heads.push((sender, mqc_head));
1649 }
1650
1651 mqc_heads
1652 }
1653
1654 pub(crate) fn inbound_hrmp_channels_contents(
1657 recipient: ParaId,
1658 ) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1659 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1660
1661 let mut inbound_hrmp_channels_contents = BTreeMap::new();
1662 for sender in sender_set {
1663 let channel_contents =
1664 HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1665 inbound_hrmp_channels_contents.insert(sender, channel_contents);
1666 }
1667
1668 inbound_hrmp_channels_contents
1669 }
1670}
1671
1672impl<T: Config> Pallet<T> {
1673 fn decrease_open_channel_request_count(sender: ParaId) {
1676 HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1677 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1678 0 => None,
1679 n => Some(n),
1680 });
1681 });
1682 }
1683
1684 fn decrease_accepted_channel_request_count(recipient: ParaId) {
1687 HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1688 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1689 0 => None,
1690 n => Some(n),
1691 });
1692 });
1693 }
1694
1695 #[cfg(any(feature = "runtime-benchmarks", test))]
1696 fn assert_storage_consistency_exhaustive() {
1697 fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1698 assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1699 }
1700
1701 let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1702 for para in paras {
1703 assert!(
1704 crate::paras::Pallet::<T>::is_valid_para(para),
1705 "{}: {:?} para is offboarded",
1706 cause,
1707 para
1708 );
1709 }
1710 };
1711
1712 assert_eq!(
1713 HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1714 HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1715 );
1716
1717 assert_eq!(
1722 HrmpOpenChannelRequestCount::<T>::iter()
1723 .map(|(k, _)| k)
1724 .collect::<BTreeSet<_>>(),
1725 HrmpOpenChannelRequests::<T>::iter()
1726 .map(|(k, _)| k.sender)
1727 .collect::<BTreeSet<_>>(),
1728 );
1729 for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1730 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1731 .filter(|(ch, _)| ch.sender == open_channel_initiator)
1732 .count() as u32;
1733 assert_eq!(expected_num, actual_num);
1734 }
1735
1736 assert_eq!(
1739 HrmpAcceptedChannelRequestCount::<T>::iter()
1740 .map(|(k, _)| k)
1741 .collect::<BTreeSet<_>>(),
1742 HrmpOpenChannelRequests::<T>::iter()
1743 .filter(|(_, v)| v.confirmed)
1744 .map(|(k, _)| k.recipient)
1745 .collect::<BTreeSet<_>>(),
1746 );
1747 for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1748 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1749 .filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1750 .count() as u32;
1751 assert_eq!(expected_num, actual_num);
1752 }
1753
1754 assert_eq!(
1755 HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1756 HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1757 );
1758
1759 assert_contains_only_onboarded(
1762 HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1763 "HRMP watermarks should contain only onboarded paras",
1764 );
1765
1766 for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1769 assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1770
1771 assert!(!contents.is_empty());
1774 }
1775
1776 assert_contains_only_onboarded(
1779 HrmpChannels::<T>::iter()
1780 .flat_map(|(k, _)| vec![k.sender, k.recipient])
1781 .collect::<Vec<_>>(),
1782 "senders and recipients in all channels should be onboarded",
1783 );
1784
1785 let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1805 .flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1806 .collect::<BTreeSet<_>>();
1807 let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1808 .flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1809 .collect::<BTreeSet<_>>();
1810 let channel_set_ground_truth = HrmpChannels::<T>::iter()
1811 .map(|(k, _)| (k.sender, k.recipient))
1812 .collect::<BTreeSet<_>>();
1813 assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1814 assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1815
1816 HrmpIngressChannelsIndex::<T>::iter()
1817 .map(|(_, v)| v)
1818 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1819 HrmpEgressChannelsIndex::<T>::iter()
1820 .map(|(_, v)| v)
1821 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1822
1823 assert_contains_only_onboarded(
1824 HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1825 "HRMP channel digests should contain only onboarded paras",
1826 );
1827 for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1828 assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1831
1832 for (_, mut senders) in digest {
1833 assert!(!senders.is_empty());
1834
1835 senders.sort();
1838 let orig_senders = senders.clone();
1839 senders.dedup();
1840 assert_eq!(
1841 orig_senders, senders,
1842 "duplicates removed implies existence of duplicates"
1843 );
1844 }
1845 }
1846 }
1847}
1848
1849impl<T: Config> Pallet<T> {
1850 fn wrap_notification(
1853 mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1854 ) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1855 use xcm::{
1856 opaque::VersionedXcm,
1857 prelude::{Junction, Location},
1858 WrapVersion,
1859 };
1860
1861 move |dest| {
1863 T::VersionWrapper::wrap_version(
1865 &Location::new(0, [Junction::Parachain(dest.into())]),
1866 notification(),
1867 )
1868 .unwrap_or_else(|_| {
1869 VersionedXcm::from(notification())
1872 })
1873 .encode()
1874 }
1875 }
1876
1877 fn send_to_para(
1879 log_label: &str,
1880 config: &HostConfiguration<BlockNumberFor<T>>,
1881 dest: ParaId,
1882 notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1883 ) {
1884 let notification_bytes = notification_bytes_for(dest);
1886
1887 if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1889 dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1890 {
1891 log::error!(
1894 target: "runtime::hrmp",
1895 "sending '{log_label}::notification_bytes' failed."
1896 );
1897 debug_assert!(false);
1898 }
1899 }
1900}