1use crate::{
18 configuration::{self, HostConfiguration},
19 dmp, ensure_parachain, initializer, paras,
20};
21use alloc::{
22 collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23 vec,
24 vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use frame_support::{pallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use frame_system::pallet_prelude::*;
30use polkadot_parachain_primitives::primitives::{HorizontalMessages, IsSystem};
31use polkadot_primitives::{
32 Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
33 SessionIndex,
34};
35use scale_info::TypeInfo;
36use sp_runtime::{
37 traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
38 ArithmeticError,
39};
40
41pub use pallet::*;
42
43pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58 fn hrmp_init_open_channel() -> Weight;
59 fn hrmp_accept_open_channel() -> Weight;
60 fn hrmp_close_channel() -> Weight;
61 fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62 fn force_process_hrmp_open(c: u32) -> Weight;
63 fn force_process_hrmp_close(c: u32) -> Weight;
64 fn hrmp_cancel_open_request(c: u32) -> Weight;
65 fn clean_open_channel_requests(c: u32) -> Weight;
66 fn force_open_hrmp_channel(c: u32) -> Weight;
67 fn establish_system_channel() -> Weight;
68 fn poke_channel_deposits() -> Weight;
69 fn establish_channel_with_system() -> Weight;
70}
71
72pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76 fn hrmp_accept_open_channel() -> Weight {
77 Weight::MAX
78 }
79 fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80 Weight::MAX
81 }
82 fn force_process_hrmp_close(_: u32) -> Weight {
83 Weight::MAX
84 }
85 fn force_process_hrmp_open(_: u32) -> Weight {
86 Weight::MAX
87 }
88 fn hrmp_cancel_open_request(_: u32) -> Weight {
89 Weight::MAX
90 }
91 fn hrmp_close_channel() -> Weight {
92 Weight::MAX
93 }
94 fn hrmp_init_open_channel() -> Weight {
95 Weight::MAX
96 }
97 fn clean_open_channel_requests(_: u32) -> Weight {
98 Weight::MAX
99 }
100 fn force_open_hrmp_channel(_: u32) -> Weight {
101 Weight::MAX
102 }
103 fn establish_system_channel() -> Weight {
104 Weight::MAX
105 }
106 fn poke_channel_deposits() -> Weight {
107 Weight::MAX
108 }
109 fn establish_channel_with_system() -> Weight {
110 Weight::MAX
111 }
112}
113
114#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117 pub confirmed: bool,
119 pub _age: SessionIndex,
122 pub sender_deposit: Balance,
124 pub max_message_size: u32,
126 pub max_capacity: u32,
128 pub max_total_size: u32,
130}
131
132#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136 pub max_capacity: u32,
144 pub max_total_size: u32,
146 pub max_message_size: u32,
148 pub msg_count: u32,
151 pub total_size: u32,
154 pub mqc_head: Option<Hash>,
162 pub sender_deposit: Balance,
164 pub recipient_deposit: Balance,
166}
167
168pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171 AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172 AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173 LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176pub(crate) enum OutboundHrmpAcceptanceErr {
179 MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180 NotSorted { idx: u32 },
181 NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182 MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183 TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184 CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189 BlockNumber: fmt::Debug,
190{
191 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192 use HrmpWatermarkAcceptanceErr::*;
193 match self {
194 AdvancementRule { new_watermark, last_watermark } => write!(
195 fmt,
196 "the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197 new_watermark, last_watermark,
198 ),
199 AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200 fmt,
201 "the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202 new_watermark, relay_chain_parent_number
203 ),
204 LandsOnBlockWithNoMessages { new_watermark } => write!(
205 fmt,
206 "the HRMP watermark ({:?}) doesn't land on a block with messages received",
207 new_watermark
208 ),
209 }
210 }
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215 use OutboundHrmpAcceptanceErr::*;
216 match self {
217 MoreMessagesThanPermitted { sent, permitted } => write!(
218 fmt,
219 "more HRMP messages than permitted by config ({} > {})",
220 sent, permitted,
221 ),
222 NotSorted { idx } => {
223 write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224 },
225 NoSuchChannel { idx, channel_id } => write!(
226 fmt,
227 "the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228 idx, channel_id.sender, channel_id.recipient,
229 ),
230 MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231 fmt,
232 "the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233 idx, msg_size, max_size,
234 ),
235 TotalSizeExceeded { idx, total_size, limit } => write!(
236 fmt,
237 "sending the HRMP message at index {} would exceed the negotiated channel total size ({} > {})",
238 idx, total_size, limit,
239 ),
240 CapacityExceeded { idx, count, limit } => write!(
241 fmt,
242 "sending the HRMP message at index {} would exceed the negotiated channel capacity ({} > {})",
243 idx, count, limit,
244 ),
245 }
246 }
247}
248
249#[frame_support::pallet]
250pub mod pallet {
251 use super::*;
252
253 #[pallet::pallet]
254 #[pallet::without_storage_info]
255 pub struct Pallet<T>(_);
256
257 #[pallet::config]
258 pub trait Config:
259 frame_system::Config + configuration::Config + paras::Config + dmp::Config
260 {
261 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
263
264 type RuntimeOrigin: From<crate::Origin>
265 + From<<Self as frame_system::Config>::RuntimeOrigin>
266 + Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
267
268 type ChannelManager: EnsureOrigin<<Self as frame_system::Config>::RuntimeOrigin>;
270
271 type Currency: ReservableCurrency<Self::AccountId>;
277
278 type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
281
282 type VersionWrapper: xcm::WrapVersion;
289
290 type WeightInfo: WeightInfo;
292 }
293
294 #[pallet::event]
295 #[pallet::generate_deposit(pub(super) fn deposit_event)]
296 pub enum Event<T: Config> {
297 OpenChannelRequested {
299 sender: ParaId,
300 recipient: ParaId,
301 proposed_max_capacity: u32,
302 proposed_max_message_size: u32,
303 },
304 OpenChannelCanceled { by_parachain: ParaId, channel_id: HrmpChannelId },
306 OpenChannelAccepted { sender: ParaId, recipient: ParaId },
308 ChannelClosed { by_parachain: ParaId, channel_id: HrmpChannelId },
310 HrmpChannelForceOpened {
312 sender: ParaId,
313 recipient: ParaId,
314 proposed_max_capacity: u32,
315 proposed_max_message_size: u32,
316 },
317 HrmpSystemChannelOpened {
319 sender: ParaId,
320 recipient: ParaId,
321 proposed_max_capacity: u32,
322 proposed_max_message_size: u32,
323 },
324 OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
326 }
327
328 #[pallet::error]
329 pub enum Error<T> {
330 OpenHrmpChannelToSelf,
332 OpenHrmpChannelInvalidRecipient,
334 OpenHrmpChannelZeroCapacity,
336 OpenHrmpChannelCapacityExceedsLimit,
338 OpenHrmpChannelZeroMessageSize,
340 OpenHrmpChannelMessageSizeExceedsLimit,
342 OpenHrmpChannelAlreadyExists,
344 OpenHrmpChannelAlreadyRequested,
346 OpenHrmpChannelLimitExceeded,
348 AcceptHrmpChannelDoesntExist,
350 AcceptHrmpChannelAlreadyConfirmed,
352 AcceptHrmpChannelLimitExceeded,
354 CloseHrmpChannelUnauthorized,
356 CloseHrmpChannelDoesntExist,
358 CloseHrmpChannelAlreadyUnderway,
360 CancelHrmpOpenChannelUnauthorized,
362 OpenHrmpChannelDoesntExist,
364 OpenHrmpChannelAlreadyConfirmed,
366 WrongWitness,
368 ChannelCreationNotAuthorized,
370 }
371
372 #[pallet::storage]
379 pub type HrmpOpenChannelRequests<T: Config> =
380 StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
381
382 #[pallet::storage]
386 pub type HrmpOpenChannelRequestsList<T: Config> =
387 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
388
389 #[pallet::storage]
393 pub type HrmpOpenChannelRequestCount<T: Config> =
394 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
395
396 #[pallet::storage]
400 pub type HrmpAcceptedChannelRequestCount<T: Config> =
401 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
402
403 #[pallet::storage]
411 pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
412
413 #[pallet::storage]
414 pub type HrmpCloseChannelRequestsList<T: Config> =
415 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
416
417 #[pallet::storage]
422 pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
423
424 #[pallet::storage]
428 pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
429
430 #[pallet::storage]
444 pub type HrmpIngressChannelsIndex<T: Config> =
445 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
446
447 #[pallet::storage]
450 pub type HrmpEgressChannelsIndex<T: Config> =
451 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
452
453 #[pallet::storage]
456 pub type HrmpChannelContents<T: Config> = StorageMap<
457 _,
458 Twox64Concat,
459 HrmpChannelId,
460 Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
461 ValueQuery,
462 >;
463
464 #[pallet::storage]
471 pub type HrmpChannelDigests<T: Config> =
472 StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
473
474 #[pallet::genesis_config]
488 #[derive(DefaultNoBound)]
489 pub struct GenesisConfig<T: Config> {
490 #[serde(skip)]
491 _config: core::marker::PhantomData<T>,
492 preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
493 }
494
495 #[pallet::genesis_build]
496 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
497 fn build(&self) {
498 initialize_storage::<T>(&self.preopen_hrmp_channels);
499 }
500 }
501
502 #[pallet::call]
503 impl<T: Config> Pallet<T> {
504 #[pallet::call_index(0)]
515 #[pallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
516 pub fn hrmp_init_open_channel(
517 origin: OriginFor<T>,
518 recipient: ParaId,
519 proposed_max_capacity: u32,
520 proposed_max_message_size: u32,
521 ) -> DispatchResult {
522 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
523 Self::init_open_channel(
524 origin,
525 recipient,
526 proposed_max_capacity,
527 proposed_max_message_size,
528 )?;
529 Self::deposit_event(Event::OpenChannelRequested {
530 sender: origin,
531 recipient,
532 proposed_max_capacity,
533 proposed_max_message_size,
534 });
535 Ok(())
536 }
537
538 #[pallet::call_index(1)]
542 #[pallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
543 pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
544 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
545 Self::accept_open_channel(origin, sender)?;
546 Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
547 Ok(())
548 }
549
550 #[pallet::call_index(2)]
555 #[pallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
556 pub fn hrmp_close_channel(
557 origin: OriginFor<T>,
558 channel_id: HrmpChannelId,
559 ) -> DispatchResult {
560 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
561 Self::close_channel(origin, channel_id.clone())?;
562 Self::deposit_event(Event::ChannelClosed { by_parachain: origin, channel_id });
563 Ok(())
564 }
565
566 #[pallet::call_index(3)]
574 #[pallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
575 pub fn force_clean_hrmp(
576 origin: OriginFor<T>,
577 para: ParaId,
578 num_inbound: u32,
579 num_outbound: u32,
580 ) -> DispatchResult {
581 T::ChannelManager::ensure_origin(origin)?;
582
583 ensure!(
584 HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
585 num_inbound as usize,
586 Error::<T>::WrongWitness
587 );
588 ensure!(
589 HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default() <=
590 num_outbound as usize,
591 Error::<T>::WrongWitness
592 );
593
594 Self::clean_hrmp_after_outgoing(¶);
595 Ok(())
596 }
597
598 #[pallet::call_index(4)]
607 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
608 pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
609 T::ChannelManager::ensure_origin(origin)?;
610
611 ensure!(
612 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
613 channels,
614 Error::<T>::WrongWitness
615 );
616
617 let host_config = configuration::ActiveConfig::<T>::get();
618 Self::process_hrmp_open_channel_requests(&host_config);
619 Ok(())
620 }
621
622 #[pallet::call_index(5)]
631 #[pallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
632 pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
633 T::ChannelManager::ensure_origin(origin)?;
634
635 ensure!(
636 HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
637 channels,
638 Error::<T>::WrongWitness
639 );
640
641 Self::process_hrmp_close_channel_requests();
642 Ok(())
643 }
644
645 #[pallet::call_index(6)]
654 #[pallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
655 pub fn hrmp_cancel_open_request(
656 origin: OriginFor<T>,
657 channel_id: HrmpChannelId,
658 open_requests: u32,
659 ) -> DispatchResult {
660 let origin = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
661 ensure!(
662 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32 <=
663 open_requests,
664 Error::<T>::WrongWitness
665 );
666 Self::cancel_open_request(origin, channel_id.clone())?;
667 Self::deposit_event(Event::OpenChannelCanceled { by_parachain: origin, channel_id });
668 Ok(())
669 }
670
671 #[pallet::call_index(7)]
680 #[pallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
681 pub fn force_open_hrmp_channel(
682 origin: OriginFor<T>,
683 sender: ParaId,
684 recipient: ParaId,
685 max_capacity: u32,
686 max_message_size: u32,
687 ) -> DispatchResultWithPostInfo {
688 T::ChannelManager::ensure_origin(origin)?;
689
690 let channel_id = HrmpChannelId { sender, recipient };
695 let cancel_request: u32 =
696 if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
697 Self::cancel_open_request(sender, channel_id)?;
698 1
699 } else {
700 0
701 };
702
703 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
706 Self::accept_open_channel(recipient, sender)?;
707 Self::deposit_event(Event::HrmpChannelForceOpened {
708 sender,
709 recipient,
710 proposed_max_capacity: max_capacity,
711 proposed_max_message_size: max_message_size,
712 });
713
714 Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
715 }
716
717 #[pallet::call_index(8)]
730 #[pallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
731 pub fn establish_system_channel(
732 origin: OriginFor<T>,
733 sender: ParaId,
734 recipient: ParaId,
735 ) -> DispatchResultWithPostInfo {
736 let _caller = ensure_signed(origin)?;
737
738 ensure!(
740 sender.is_system() && recipient.is_system(),
741 Error::<T>::ChannelCreationNotAuthorized
742 );
743
744 let config = configuration::ActiveConfig::<T>::get();
745 let max_message_size = config.hrmp_channel_max_message_size;
746 let max_capacity = config.hrmp_channel_max_capacity;
747
748 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
749 Self::accept_open_channel(recipient, sender)?;
750
751 Self::deposit_event(Event::HrmpSystemChannelOpened {
752 sender,
753 recipient,
754 proposed_max_capacity: max_capacity,
755 proposed_max_message_size: max_message_size,
756 });
757
758 Ok(Pays::No.into())
759 }
760
761 #[pallet::call_index(9)]
771 #[pallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
772 pub fn poke_channel_deposits(
773 origin: OriginFor<T>,
774 sender: ParaId,
775 recipient: ParaId,
776 ) -> DispatchResult {
777 let _caller = ensure_signed(origin)?;
778 let channel_id = HrmpChannelId { sender, recipient };
779 let is_system = sender.is_system() || recipient.is_system();
780
781 let config = configuration::ActiveConfig::<T>::get();
782
783 let (new_sender_deposit, new_recipient_deposit) = if is_system {
785 (0, 0)
786 } else {
787 (config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
788 };
789
790 let _ = HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
791 if let Some(ref mut channel) = channel {
792 let current_sender_deposit = channel.sender_deposit;
793 let current_recipient_deposit = channel.recipient_deposit;
794
795 if current_sender_deposit == new_sender_deposit &&
797 current_recipient_deposit == new_recipient_deposit
798 {
799 return Ok(())
800 }
801
802 if current_sender_deposit > new_sender_deposit {
804 let amount = current_sender_deposit
806 .checked_sub(new_sender_deposit)
807 .ok_or(ArithmeticError::Underflow)?;
808 T::Currency::unreserve(
809 &channel_id.sender.into_account_truncating(),
810 amount.try_into().unwrap_or(Zero::zero()),
813 );
814 } else if current_sender_deposit < new_sender_deposit {
815 let amount = new_sender_deposit
816 .checked_sub(current_sender_deposit)
817 .ok_or(ArithmeticError::Underflow)?;
818 T::Currency::reserve(
819 &channel_id.sender.into_account_truncating(),
820 amount.try_into().unwrap_or(Zero::zero()),
821 )?;
822 }
823
824 if current_recipient_deposit > new_recipient_deposit {
826 let amount = current_recipient_deposit
827 .checked_sub(new_recipient_deposit)
828 .ok_or(ArithmeticError::Underflow)?;
829 T::Currency::unreserve(
830 &channel_id.recipient.into_account_truncating(),
831 amount.try_into().unwrap_or(Zero::zero()),
832 );
833 } else if current_recipient_deposit < new_recipient_deposit {
834 let amount = new_recipient_deposit
835 .checked_sub(current_recipient_deposit)
836 .ok_or(ArithmeticError::Underflow)?;
837 T::Currency::reserve(
838 &channel_id.recipient.into_account_truncating(),
839 amount.try_into().unwrap_or(Zero::zero()),
840 )?;
841 }
842
843 channel.sender_deposit = new_sender_deposit;
845 channel.recipient_deposit = new_recipient_deposit;
846 } else {
847 return Err(Error::<T>::OpenHrmpChannelDoesntExist.into())
848 }
849 Ok(())
850 })?;
851
852 Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
853
854 Ok(())
855 }
856
857 #[pallet::call_index(10)]
865 #[pallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
866 pub fn establish_channel_with_system(
867 origin: OriginFor<T>,
868 target_system_chain: ParaId,
869 ) -> DispatchResultWithPostInfo {
870 let sender = ensure_parachain(<T as Config>::RuntimeOrigin::from(origin))?;
871
872 ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
873
874 let (max_message_size, max_capacity) =
875 T::DefaultChannelSizeAndCapacityWithSystem::get();
876
877 Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
879 Self::accept_open_channel(target_system_chain, sender)?;
880
881 Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
882 Self::accept_open_channel(sender, target_system_chain)?;
883
884 Self::deposit_event(Event::HrmpSystemChannelOpened {
885 sender,
886 recipient: target_system_chain,
887 proposed_max_capacity: max_capacity,
888 proposed_max_message_size: max_message_size,
889 });
890
891 Self::deposit_event(Event::HrmpSystemChannelOpened {
892 sender: target_system_chain,
893 recipient: sender,
894 proposed_max_capacity: max_capacity,
895 proposed_max_message_size: max_message_size,
896 });
897
898 Ok(Pays::No.into())
899 }
900 }
901}
902
903fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
904 let host_config = configuration::ActiveConfig::<T>::get();
905 for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
906 if let Err(err) =
907 preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
908 {
909 panic!("failed to initialize the genesis storage: {:?}", err);
910 }
911 }
912 Pallet::<T>::process_hrmp_open_channel_requests(&host_config);
913}
914
915fn preopen_hrmp_channel<T: Config>(
916 sender: ParaId,
917 recipient: ParaId,
918 max_capacity: u32,
919 max_message_size: u32,
920) -> DispatchResult {
921 Pallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
922 Pallet::<T>::accept_open_channel(recipient, sender)?;
923 Ok(())
924}
925
926impl<T: Config> Pallet<T> {
928 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
930 Weight::zero()
931 }
932
933 pub(crate) fn initializer_finalize() {}
935
936 pub(crate) fn initializer_on_new_session(
938 notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
939 outgoing_paras: &[ParaId],
940 ) -> Weight {
941 let w1 = Self::perform_outgoing_para_cleanup(¬ification.prev_config, outgoing_paras);
942 Self::process_hrmp_open_channel_requests(¬ification.prev_config);
943 Self::process_hrmp_close_channel_requests();
944 w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
945 outgoing_paras.len() as u32
946 ))
947 .saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
948 outgoing_paras.len() as u32,
949 ))
950 }
951
952 fn perform_outgoing_para_cleanup(
955 config: &HostConfiguration<BlockNumberFor<T>>,
956 outgoing: &[ParaId],
957 ) -> Weight {
958 let mut w = Self::clean_open_channel_requests(config, outgoing);
959 for outgoing_para in outgoing {
960 Self::clean_hrmp_after_outgoing(outgoing_para);
961
962 let ingress_count =
965 HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
966 let egress_count =
967 HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
968 w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
969 ingress_count,
970 egress_count,
971 ));
972 }
973 w
974 }
975
976 pub(crate) fn clean_open_channel_requests(
980 config: &HostConfiguration<BlockNumberFor<T>>,
981 outgoing: &[ParaId],
982 ) -> Weight {
983 let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
989 let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
990 .into_iter()
991 .partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
992 HrmpOpenChannelRequestsList::<T>::put(stay);
993
994 for req_id in go {
997 let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
998 Some(req_data) => req_data,
999 None => {
1000 continue
1002 },
1003 };
1004
1005 if !outgoing.contains(&req_id.sender) {
1007 T::Currency::unreserve(
1008 &req_id.sender.into_account_truncating(),
1009 req_data.sender_deposit.unique_saturated_into(),
1010 );
1011 }
1012
1013 if req_data.confirmed {
1019 if !outgoing.contains(&req_id.recipient) {
1020 T::Currency::unreserve(
1021 &req_id.recipient.into_account_truncating(),
1022 config.hrmp_recipient_deposit.unique_saturated_into(),
1023 );
1024 }
1025 Self::decrease_accepted_channel_request_count(req_id.recipient);
1026 }
1027 }
1028
1029 <T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1030 }
1031
1032 fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1034 HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1035 HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1036
1037 let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1038 .into_iter()
1039 .map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1040 let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1041 .into_iter()
1042 .map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1043 let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1044 to_close.sort();
1045 to_close.dedup();
1046
1047 for channel in to_close {
1048 Self::close_hrmp_channel(&channel);
1049 }
1050 }
1051
1052 fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1057 let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1058 if open_req_channels.is_empty() {
1059 return
1060 }
1061
1062 let mut idx = open_req_channels.len();
1065 loop {
1066 if idx == 0 {
1068 break
1069 }
1070
1071 idx -= 1;
1072 let channel_id = open_req_channels[idx].clone();
1073 let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1074 "can't be `None` due to the invariant that the list contains the same items as the set; qed",
1075 );
1076
1077 let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1078 let sender_deposit = request.sender_deposit;
1079 let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1080
1081 if request.confirmed {
1082 if paras::Pallet::<T>::is_valid_para(channel_id.sender) &&
1083 paras::Pallet::<T>::is_valid_para(channel_id.recipient)
1084 {
1085 HrmpChannels::<T>::insert(
1086 &channel_id,
1087 HrmpChannel {
1088 sender_deposit,
1089 recipient_deposit,
1090 max_capacity: request.max_capacity,
1091 max_total_size: request.max_total_size,
1092 max_message_size: request.max_message_size,
1093 msg_count: 0,
1094 total_size: 0,
1095 mqc_head: None,
1096 },
1097 );
1098
1099 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1100 if let Err(i) = v.binary_search(&channel_id.sender) {
1101 v.insert(i, channel_id.sender);
1102 }
1103 });
1104 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1105 if let Err(i) = v.binary_search(&channel_id.recipient) {
1106 v.insert(i, channel_id.recipient);
1107 }
1108 });
1109 }
1110
1111 Self::decrease_open_channel_request_count(channel_id.sender);
1112 Self::decrease_accepted_channel_request_count(channel_id.recipient);
1113
1114 let _ = open_req_channels.swap_remove(idx);
1115 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1116 }
1117 }
1118
1119 HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1120 }
1121
1122 fn process_hrmp_close_channel_requests() {
1124 let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1125 for condemned_ch_id in close_reqs {
1126 HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1127 Self::close_hrmp_channel(&condemned_ch_id);
1128 }
1129 }
1130
1131 fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1138 if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1139 HrmpChannels::<T>::take(channel_id)
1140 {
1141 T::Currency::unreserve(
1142 &channel_id.sender.into_account_truncating(),
1143 sender_deposit.unique_saturated_into(),
1144 );
1145 T::Currency::unreserve(
1146 &channel_id.recipient.into_account_truncating(),
1147 recipient_deposit.unique_saturated_into(),
1148 );
1149 }
1150
1151 HrmpChannelContents::<T>::remove(channel_id);
1152
1153 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1154 if let Ok(i) = v.binary_search(&channel_id.recipient) {
1155 v.remove(i);
1156 }
1157 });
1158 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1159 if let Ok(i) = v.binary_search(&channel_id.sender) {
1160 v.remove(i);
1161 }
1162 });
1163 }
1164
1165 pub(crate) fn check_hrmp_watermark(
1167 recipient: ParaId,
1168 relay_chain_parent_number: BlockNumberFor<T>,
1169 new_hrmp_watermark: BlockNumberFor<T>,
1170 ) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1171 if new_hrmp_watermark == relay_chain_parent_number {
1180 return Ok(())
1181 }
1182
1183 if new_hrmp_watermark > relay_chain_parent_number {
1184 return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1185 new_watermark: new_hrmp_watermark,
1186 relay_chain_parent_number,
1187 })
1188 }
1189
1190 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1191 if new_hrmp_watermark <= last_watermark {
1192 return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1193 new_watermark: new_hrmp_watermark,
1194 last_watermark,
1195 })
1196 }
1197 }
1198
1199 let digest = HrmpChannelDigests::<T>::get(&recipient);
1204 if !digest
1205 .binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1206 .is_ok()
1207 {
1208 return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1209 new_watermark: new_hrmp_watermark,
1210 })
1211 }
1212 Ok(())
1213 }
1214
1215 pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1217 HrmpChannelDigests::<T>::get(&recipient)
1218 .into_iter()
1219 .map(|(block_no, _)| block_no)
1220 .collect()
1221 }
1222
1223 pub(crate) fn check_outbound_hrmp(
1224 config: &HostConfiguration<BlockNumberFor<T>>,
1225 sender: ParaId,
1226 out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1227 ) -> Result<(), OutboundHrmpAcceptanceErr> {
1228 if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1229 return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1230 sent: out_hrmp_msgs.len() as u32,
1231 permitted: config.hrmp_max_message_num_per_candidate,
1232 })
1233 }
1234
1235 let mut last_recipient = None::<ParaId>;
1236
1237 for (idx, out_msg) in
1238 out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1239 {
1240 match last_recipient {
1241 Some(last_recipient) if out_msg.recipient <= last_recipient =>
1245 return Err(OutboundHrmpAcceptanceErr::NotSorted { idx }),
1246 _ => last_recipient = Some(out_msg.recipient),
1247 }
1248
1249 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1250
1251 let channel = match HrmpChannels::<T>::get(&channel_id) {
1252 Some(channel) => channel,
1253 None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1254 };
1255
1256 let msg_size = out_msg.data.len() as u32;
1257 if msg_size > channel.max_message_size {
1258 return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1259 idx,
1260 msg_size,
1261 max_size: channel.max_message_size,
1262 })
1263 }
1264
1265 let new_total_size = channel.total_size + out_msg.data.len() as u32;
1266 if new_total_size > channel.max_total_size {
1267 return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1268 idx,
1269 total_size: new_total_size,
1270 limit: channel.max_total_size,
1271 })
1272 }
1273
1274 let new_msg_count = channel.msg_count + 1;
1275 if new_msg_count > channel.max_capacity {
1276 return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1277 idx,
1278 count: new_msg_count,
1279 limit: channel.max_capacity,
1280 })
1281 }
1282 }
1283
1284 Ok(())
1285 }
1286
1287 pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1289 let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1290 let mut remaining = Vec::with_capacity(recipients.len());
1291
1292 for recipient in recipients {
1293 let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1294 continue
1295 };
1296 remaining.push((
1297 recipient,
1298 (
1299 channel.max_capacity - channel.msg_count,
1300 channel.max_total_size - channel.total_size,
1301 ),
1302 ));
1303 }
1304
1305 remaining
1306 }
1307
1308 pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1309 let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1312 let mut senders = BTreeSet::new();
1313 let mut leftover = Vec::with_capacity(digest.len());
1314 for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1315 if block_no <= new_hrmp_watermark {
1316 senders.extend(paras_sent_msg);
1317 } else {
1318 leftover.push((block_no, paras_sent_msg));
1319 }
1320 }
1321 *digest = leftover;
1322 senders
1323 });
1324
1325 let channels_to_prune =
1327 senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1328 for channel_id in channels_to_prune {
1329 let (mut pruned_cnt, mut pruned_size) = (0, 0);
1332
1333 let contents = HrmpChannelContents::<T>::get(&channel_id);
1334 let mut leftover = Vec::with_capacity(contents.len());
1335 for msg in contents {
1336 if msg.sent_at <= new_hrmp_watermark {
1337 pruned_cnt += 1;
1338 pruned_size += msg.data.len();
1339 } else {
1340 leftover.push(msg);
1341 }
1342 }
1343 if !leftover.is_empty() {
1344 HrmpChannelContents::<T>::insert(&channel_id, leftover);
1345 } else {
1346 HrmpChannelContents::<T>::remove(&channel_id);
1347 }
1348
1349 HrmpChannels::<T>::mutate(&channel_id, |channel| {
1351 if let Some(ref mut channel) = channel {
1352 channel.msg_count -= pruned_cnt as u32;
1353 channel.total_size -= pruned_size as u32;
1354 }
1355 });
1356 }
1357
1358 HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1359 }
1360
1361 pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1363 let now = frame_system::Pallet::<T>::block_number();
1364
1365 for out_msg in out_hrmp_msgs {
1366 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1367
1368 let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1369 Some(channel) => channel,
1370 None => {
1371 continue
1374 },
1375 };
1376
1377 let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1378
1379 channel.msg_count += 1;
1381 channel.total_size += inbound.data.len() as u32;
1382
1383 let prev_head = channel.mqc_head.unwrap_or(Default::default());
1385 let new_head = BlakeTwo256::hash_of(&(
1386 prev_head,
1387 inbound.sent_at,
1388 T::Hashing::hash_of(&inbound.data),
1389 ));
1390 channel.mqc_head = Some(new_head);
1391
1392 HrmpChannels::<T>::insert(&channel_id, channel);
1393 HrmpChannelContents::<T>::append(&channel_id, inbound);
1394
1395 let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1408 if let Some(cur_block_digest) = recipient_digest
1409 .last_mut()
1410 .filter(|(block_no, _)| *block_no == now)
1411 .map(|(_, ref mut d)| d)
1412 {
1413 cur_block_digest.push(sender);
1414 } else {
1415 recipient_digest.push((now, vec![sender]));
1416 }
1417 HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1418 }
1419 }
1420
1421 pub fn init_open_channel(
1429 origin: ParaId,
1430 recipient: ParaId,
1431 proposed_max_capacity: u32,
1432 proposed_max_message_size: u32,
1433 ) -> DispatchResult {
1434 ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1435 ensure!(
1436 paras::Pallet::<T>::is_valid_para(recipient),
1437 Error::<T>::OpenHrmpChannelInvalidRecipient,
1438 );
1439
1440 let config = configuration::ActiveConfig::<T>::get();
1441 ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1442 ensure!(
1443 proposed_max_capacity <= config.hrmp_channel_max_capacity,
1444 Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1445 );
1446 ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1447 ensure!(
1448 proposed_max_message_size <= config.hrmp_channel_max_message_size,
1449 Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1450 );
1451
1452 let channel_id = HrmpChannelId { sender: origin, recipient };
1453 ensure!(
1454 HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1455 Error::<T>::OpenHrmpChannelAlreadyRequested,
1456 );
1457 ensure!(
1458 HrmpChannels::<T>::get(&channel_id).is_none(),
1459 Error::<T>::OpenHrmpChannelAlreadyExists,
1460 );
1461
1462 let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1463 let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1464 let channel_num_limit = config.hrmp_max_parachain_outbound_channels;
1465 ensure!(
1466 egress_cnt + open_req_cnt < channel_num_limit,
1467 Error::<T>::OpenHrmpChannelLimitExceeded,
1468 );
1469
1470 let is_system = origin.is_system() || recipient.is_system();
1472 let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1473 if !deposit.is_zero() {
1474 T::Currency::reserve(
1475 &origin.into_account_truncating(),
1476 deposit.unique_saturated_into(),
1477 )?;
1478 }
1479
1480 HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1483 HrmpOpenChannelRequests::<T>::insert(
1484 &channel_id,
1485 HrmpOpenChannelRequest {
1486 confirmed: false,
1487 _age: 0,
1488 sender_deposit: deposit,
1489 max_capacity: proposed_max_capacity,
1490 max_message_size: proposed_max_message_size,
1491 max_total_size: config.hrmp_channel_max_total_size,
1492 },
1493 );
1494 HrmpOpenChannelRequestsList::<T>::append(channel_id);
1495
1496 Self::send_to_para(
1497 "init_open_channel",
1498 &config,
1499 recipient,
1500 Self::wrap_notification(|| {
1501 use xcm::opaque::latest::{prelude::*, Xcm};
1502 Xcm(vec![HrmpNewChannelOpenRequest {
1503 sender: origin.into(),
1504 max_capacity: proposed_max_capacity,
1505 max_message_size: proposed_max_message_size,
1506 }])
1507 }),
1508 );
1509
1510 Ok(())
1511 }
1512
1513 pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1518 let channel_id = HrmpChannelId { sender, recipient: origin };
1519 let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1520 .ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1521 ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1522
1523 let config = configuration::ActiveConfig::<T>::get();
1526 let channel_num_limit = config.hrmp_max_parachain_inbound_channels;
1527 let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1528 let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1529 ensure!(
1530 ingress_cnt + accepted_cnt < channel_num_limit,
1531 Error::<T>::AcceptHrmpChannelLimitExceeded,
1532 );
1533
1534 let is_system = origin.is_system() || sender.is_system();
1536 let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1537 if !deposit.is_zero() {
1538 T::Currency::reserve(
1539 &origin.into_account_truncating(),
1540 deposit.unique_saturated_into(),
1541 )?;
1542 }
1543
1544 channel_req.confirmed = true;
1547 HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1548 HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1549
1550 Self::send_to_para(
1551 "accept_open_channel",
1552 &config,
1553 sender,
1554 Self::wrap_notification(|| {
1555 use xcm::opaque::latest::{prelude::*, Xcm};
1556 Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1557 }),
1558 );
1559
1560 Ok(())
1561 }
1562
1563 fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1564 ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1566
1567 let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1568 .ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1569 ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1570
1571 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1573 HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1574 if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1575 open_req_channels.swap_remove(pos);
1576 }
1577 });
1578
1579 Self::decrease_open_channel_request_count(channel_id.sender);
1580 T::Currency::unreserve(
1586 &channel_id.sender.into_account_truncating(),
1587 open_channel_req.sender_deposit.unique_saturated_into(),
1588 );
1589
1590 Ok(())
1591 }
1592
1593 fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1594 ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1596
1597 ensure!(
1599 HrmpChannels::<T>::get(&channel_id).is_some(),
1600 Error::<T>::CloseHrmpChannelDoesntExist,
1601 );
1602
1603 ensure!(
1605 HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1606 Error::<T>::CloseHrmpChannelAlreadyUnderway,
1607 );
1608
1609 HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1610 HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1611
1612 let config = configuration::ActiveConfig::<T>::get();
1613 let opposite_party =
1614 if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1615
1616 Self::send_to_para(
1617 "close_channel",
1618 &config,
1619 opposite_party,
1620 Self::wrap_notification(|| {
1621 use xcm::opaque::latest::{prelude::*, Xcm};
1622 Xcm(vec![HrmpChannelClosing {
1623 initiator: origin.into(),
1624 sender: channel_id.sender.into(),
1625 recipient: channel_id.recipient.into(),
1626 }])
1627 }),
1628 );
1629
1630 Ok(())
1631 }
1632
1633 #[cfg(test)]
1637 fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1638 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1639
1640 let mut mqc_heads = Vec::with_capacity(sender_set.len());
1642 for sender in sender_set {
1643 let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1644 let mqc_head = channel_metadata
1645 .and_then(|metadata| metadata.mqc_head)
1646 .unwrap_or(Hash::default());
1647 mqc_heads.push((sender, mqc_head));
1648 }
1649
1650 mqc_heads
1651 }
1652
1653 pub(crate) fn inbound_hrmp_channels_contents(
1656 recipient: ParaId,
1657 ) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1658 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1659
1660 let mut inbound_hrmp_channels_contents = BTreeMap::new();
1661 for sender in sender_set {
1662 let channel_contents =
1663 HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1664 inbound_hrmp_channels_contents.insert(sender, channel_contents);
1665 }
1666
1667 inbound_hrmp_channels_contents
1668 }
1669}
1670
1671impl<T: Config> Pallet<T> {
1672 fn decrease_open_channel_request_count(sender: ParaId) {
1675 HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1676 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1677 0 => None,
1678 n => Some(n),
1679 });
1680 });
1681 }
1682
1683 fn decrease_accepted_channel_request_count(recipient: ParaId) {
1686 HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1687 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1688 0 => None,
1689 n => Some(n),
1690 });
1691 });
1692 }
1693
1694 #[cfg(any(feature = "runtime-benchmarks", test))]
1695 fn assert_storage_consistency_exhaustive() {
1696 fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1697 assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1698 }
1699
1700 let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1701 for para in paras {
1702 assert!(
1703 crate::paras::Pallet::<T>::is_valid_para(para),
1704 "{}: {:?} para is offboarded",
1705 cause,
1706 para
1707 );
1708 }
1709 };
1710
1711 assert_eq!(
1712 HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1713 HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1714 );
1715
1716 assert_eq!(
1721 HrmpOpenChannelRequestCount::<T>::iter()
1722 .map(|(k, _)| k)
1723 .collect::<BTreeSet<_>>(),
1724 HrmpOpenChannelRequests::<T>::iter()
1725 .map(|(k, _)| k.sender)
1726 .collect::<BTreeSet<_>>(),
1727 );
1728 for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1729 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1730 .filter(|(ch, _)| ch.sender == open_channel_initiator)
1731 .count() as u32;
1732 assert_eq!(expected_num, actual_num);
1733 }
1734
1735 assert_eq!(
1738 HrmpAcceptedChannelRequestCount::<T>::iter()
1739 .map(|(k, _)| k)
1740 .collect::<BTreeSet<_>>(),
1741 HrmpOpenChannelRequests::<T>::iter()
1742 .filter(|(_, v)| v.confirmed)
1743 .map(|(k, _)| k.recipient)
1744 .collect::<BTreeSet<_>>(),
1745 );
1746 for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1747 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1748 .filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1749 .count() as u32;
1750 assert_eq!(expected_num, actual_num);
1751 }
1752
1753 assert_eq!(
1754 HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1755 HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1756 );
1757
1758 assert_contains_only_onboarded(
1761 HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1762 "HRMP watermarks should contain only onboarded paras",
1763 );
1764
1765 for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1768 assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1769
1770 assert!(!contents.is_empty());
1773 }
1774
1775 assert_contains_only_onboarded(
1778 HrmpChannels::<T>::iter()
1779 .flat_map(|(k, _)| vec![k.sender, k.recipient])
1780 .collect::<Vec<_>>(),
1781 "senders and recipients in all channels should be onboarded",
1782 );
1783
1784 let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1804 .flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1805 .collect::<BTreeSet<_>>();
1806 let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1807 .flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1808 .collect::<BTreeSet<_>>();
1809 let channel_set_ground_truth = HrmpChannels::<T>::iter()
1810 .map(|(k, _)| (k.sender, k.recipient))
1811 .collect::<BTreeSet<_>>();
1812 assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1813 assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1814
1815 HrmpIngressChannelsIndex::<T>::iter()
1816 .map(|(_, v)| v)
1817 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1818 HrmpEgressChannelsIndex::<T>::iter()
1819 .map(|(_, v)| v)
1820 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1821
1822 assert_contains_only_onboarded(
1823 HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1824 "HRMP channel digests should contain only onboarded paras",
1825 );
1826 for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1827 assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1830
1831 for (_, mut senders) in digest {
1832 assert!(!senders.is_empty());
1833
1834 senders.sort();
1837 let orig_senders = senders.clone();
1838 senders.dedup();
1839 assert_eq!(
1840 orig_senders, senders,
1841 "duplicates removed implies existence of duplicates"
1842 );
1843 }
1844 }
1845 }
1846}
1847
1848impl<T: Config> Pallet<T> {
1849 fn wrap_notification(
1852 mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1853 ) -> impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage {
1854 use xcm::{
1855 opaque::VersionedXcm,
1856 prelude::{Junction, Location},
1857 WrapVersion,
1858 };
1859
1860 move |dest| {
1862 T::VersionWrapper::wrap_version(
1864 &Location::new(0, [Junction::Parachain(dest.into())]),
1865 notification(),
1866 )
1867 .unwrap_or_else(|_| {
1868 VersionedXcm::from(notification())
1871 })
1872 .encode()
1873 }
1874 }
1875
1876 fn send_to_para(
1878 log_label: &str,
1879 config: &HostConfiguration<BlockNumberFor<T>>,
1880 dest: ParaId,
1881 notification_bytes_for: impl FnOnce(ParaId) -> polkadot_primitives::DownwardMessage,
1882 ) {
1883 let notification_bytes = notification_bytes_for(dest);
1885
1886 if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1888 dmp::Pallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1889 {
1890 log::error!(
1893 target: "runtime::hrmp",
1894 "sending '{log_label}::notification_bytes' failed."
1895 );
1896 debug_assert!(false);
1897 }
1898 }
1899}