1use crate::{
18 configuration::{self, HostConfiguration},
19 dmp, ensure_teyrchain, initializer, paras,
20};
21use alloc::{
22 collections::{btree_map::BTreeMap, btree_set::BTreeSet},
23 vec,
24 vec::Vec,
25};
26use codec::{Decode, Encode};
27use core::{fmt, mem};
28use pezframe_support::{pezpallet_prelude::*, traits::ReservableCurrency, DefaultNoBound};
29use pezframe_system::pezpallet_prelude::*;
30use pezkuwi_primitives::{
31 Balance, Hash, HrmpChannelId, Id as ParaId, InboundHrmpMessage, OutboundHrmpMessage,
32 SessionIndex,
33};
34use pezkuwi_teyrchain_primitives::primitives::{HorizontalMessages, IsSystem};
35use pezsp_runtime::{
36 traits::{AccountIdConversion, BlakeTwo256, Hash as HashT, UniqueSaturatedInto, Zero},
37 ArithmeticError,
38};
39use scale_info::TypeInfo;
40
41pub use pezpallet::*;
42
43pub const HRMP_MAX_INBOUND_CHANNELS_BOUND: u32 = 128;
48pub const HRMP_MAX_OUTBOUND_CHANNELS_BOUND: u32 = 128;
50
51#[cfg(test)]
52pub(crate) mod tests;
53
54#[cfg(feature = "runtime-benchmarks")]
55mod benchmarking;
56
57pub trait WeightInfo {
58 fn hrmp_init_open_channel() -> Weight;
59 fn hrmp_accept_open_channel() -> Weight;
60 fn hrmp_close_channel() -> Weight;
61 fn force_clean_hrmp(i: u32, e: u32) -> Weight;
62 fn force_process_hrmp_open(c: u32) -> Weight;
63 fn force_process_hrmp_close(c: u32) -> Weight;
64 fn hrmp_cancel_open_request(c: u32) -> Weight;
65 fn clean_open_channel_requests(c: u32) -> Weight;
66 fn force_open_hrmp_channel(c: u32) -> Weight;
67 fn establish_system_channel() -> Weight;
68 fn poke_channel_deposits() -> Weight;
69 fn establish_channel_with_system() -> Weight;
70}
71
72pub struct TestWeightInfo;
74
75impl WeightInfo for TestWeightInfo {
76 fn hrmp_accept_open_channel() -> Weight {
77 Weight::MAX
78 }
79 fn force_clean_hrmp(_: u32, _: u32) -> Weight {
80 Weight::MAX
81 }
82 fn force_process_hrmp_close(_: u32) -> Weight {
83 Weight::MAX
84 }
85 fn force_process_hrmp_open(_: u32) -> Weight {
86 Weight::MAX
87 }
88 fn hrmp_cancel_open_request(_: u32) -> Weight {
89 Weight::MAX
90 }
91 fn hrmp_close_channel() -> Weight {
92 Weight::MAX
93 }
94 fn hrmp_init_open_channel() -> Weight {
95 Weight::MAX
96 }
97 fn clean_open_channel_requests(_: u32) -> Weight {
98 Weight::MAX
99 }
100 fn force_open_hrmp_channel(_: u32) -> Weight {
101 Weight::MAX
102 }
103 fn establish_system_channel() -> Weight {
104 Weight::MAX
105 }
106 fn poke_channel_deposits() -> Weight {
107 Weight::MAX
108 }
109 fn establish_channel_with_system() -> Weight {
110 Weight::MAX
111 }
112}
113
114#[derive(Encode, Decode, TypeInfo)]
116pub struct HrmpOpenChannelRequest {
117 pub confirmed: bool,
119 pub _age: SessionIndex,
122 pub sender_deposit: Balance,
124 pub max_message_size: u32,
126 pub max_capacity: u32,
128 pub max_total_size: u32,
130}
131
132#[derive(Encode, Decode, TypeInfo)]
134#[cfg_attr(test, derive(Debug))]
135pub struct HrmpChannel {
136 pub max_capacity: u32,
144 pub max_total_size: u32,
146 pub max_message_size: u32,
148 pub msg_count: u32,
151 pub total_size: u32,
154 pub mqc_head: Option<Hash>,
162 pub sender_deposit: Balance,
164 pub recipient_deposit: Balance,
166}
167
168pub(crate) enum HrmpWatermarkAcceptanceErr<BlockNumber> {
171 AdvancementRule { new_watermark: BlockNumber, last_watermark: BlockNumber },
172 AheadRelayParent { new_watermark: BlockNumber, relay_chain_parent_number: BlockNumber },
173 LandsOnBlockWithNoMessages { new_watermark: BlockNumber },
174}
175
176pub(crate) enum OutboundHrmpAcceptanceErr {
179 MoreMessagesThanPermitted { sent: u32, permitted: u32 },
180 NotSorted { idx: u32 },
181 NoSuchChannel { idx: u32, channel_id: HrmpChannelId },
182 MaxMessageSizeExceeded { idx: u32, msg_size: u32, max_size: u32 },
183 TotalSizeExceeded { idx: u32, total_size: u32, limit: u32 },
184 CapacityExceeded { idx: u32, count: u32, limit: u32 },
185}
186
187impl<BlockNumber> fmt::Debug for HrmpWatermarkAcceptanceErr<BlockNumber>
188where
189 BlockNumber: fmt::Debug,
190{
191 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
192 use HrmpWatermarkAcceptanceErr::*;
193 match self {
194 AdvancementRule { new_watermark, last_watermark } => write!(
195 fmt,
196 "the HRMP watermark is not advanced relative to the last watermark ({:?} > {:?})",
197 new_watermark, last_watermark,
198 ),
199 AheadRelayParent { new_watermark, relay_chain_parent_number } => write!(
200 fmt,
201 "the HRMP watermark is ahead the relay-parent ({:?} > {:?})",
202 new_watermark, relay_chain_parent_number
203 ),
204 LandsOnBlockWithNoMessages { new_watermark } => write!(
205 fmt,
206 "the HRMP watermark ({:?}) doesn't land on a block with messages received",
207 new_watermark
208 ),
209 }
210 }
211}
212
213impl fmt::Debug for OutboundHrmpAcceptanceErr {
214 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
215 use OutboundHrmpAcceptanceErr::*;
216 match self {
217 MoreMessagesThanPermitted { sent, permitted } => write!(
218 fmt,
219 "more HRMP messages than permitted by config ({} > {})",
220 sent, permitted,
221 ),
222 NotSorted { idx } => {
223 write!(fmt, "the HRMP messages are not sorted (first unsorted is at index {})", idx,)
224 },
225 NoSuchChannel { idx, channel_id } => write!(
226 fmt,
227 "the HRMP message at index {} is sent to a non existent channel {:?}->{:?}",
228 idx, channel_id.sender, channel_id.recipient,
229 ),
230 MaxMessageSizeExceeded { idx, msg_size, max_size } => write!(
231 fmt,
232 "the HRMP message at index {} exceeds the negotiated channel maximum message size ({} > {})",
233 idx, msg_size, max_size,
234 ),
235 TotalSizeExceeded { idx, total_size, limit } => write!(
236 fmt,
237 "sending the HRMP message at index {} would exceed the negotiated channel total size ({} > {})",
238 idx, total_size, limit,
239 ),
240 CapacityExceeded { idx, count, limit } => write!(
241 fmt,
242 "sending the HRMP message at index {} would exceed the negotiated channel capacity ({} > {})",
243 idx, count, limit,
244 ),
245 }
246 }
247}
248
249#[pezframe_support::pezpallet]
250pub mod pezpallet {
251 use super::*;
252
253 #[pezpallet::pezpallet]
254 #[pezpallet::without_storage_info]
255 pub struct Pezpallet<T>(_);
256
257 #[pezpallet::config]
258 pub trait Config:
259 pezframe_system::Config + configuration::Config + paras::Config + dmp::Config
260 {
261 #[allow(deprecated)]
263 type RuntimeEvent: From<Event<Self>>
264 + IsType<<Self as pezframe_system::Config>::RuntimeEvent>;
265
266 type RuntimeOrigin: From<crate::Origin>
267 + From<<Self as pezframe_system::Config>::RuntimeOrigin>
268 + Into<Result<crate::Origin, <Self as Config>::RuntimeOrigin>>;
269
270 type ChannelManager: EnsureOrigin<<Self as pezframe_system::Config>::RuntimeOrigin>;
272
273 type Currency: ReservableCurrency<Self::AccountId>;
279
280 type DefaultChannelSizeAndCapacityWithSystem: Get<(u32, u32)>;
283
284 type VersionWrapper: xcm::WrapVersion;
291
292 type WeightInfo: WeightInfo;
294 }
295
296 #[pezpallet::event]
297 #[pezpallet::generate_deposit(pub(super) fn deposit_event)]
298 pub enum Event<T: Config> {
299 OpenChannelRequested {
301 sender: ParaId,
302 recipient: ParaId,
303 proposed_max_capacity: u32,
304 proposed_max_message_size: u32,
305 },
306 OpenChannelCanceled { by_teyrchain: ParaId, channel_id: HrmpChannelId },
308 OpenChannelAccepted { sender: ParaId, recipient: ParaId },
310 ChannelClosed { by_teyrchain: ParaId, channel_id: HrmpChannelId },
312 HrmpChannelForceOpened {
314 sender: ParaId,
315 recipient: ParaId,
316 proposed_max_capacity: u32,
317 proposed_max_message_size: u32,
318 },
319 HrmpSystemChannelOpened {
321 sender: ParaId,
322 recipient: ParaId,
323 proposed_max_capacity: u32,
324 proposed_max_message_size: u32,
325 },
326 OpenChannelDepositsUpdated { sender: ParaId, recipient: ParaId },
328 }
329
330 #[pezpallet::error]
331 pub enum Error<T> {
332 OpenHrmpChannelToSelf,
334 OpenHrmpChannelInvalidRecipient,
336 OpenHrmpChannelZeroCapacity,
338 OpenHrmpChannelCapacityExceedsLimit,
340 OpenHrmpChannelZeroMessageSize,
342 OpenHrmpChannelMessageSizeExceedsLimit,
344 OpenHrmpChannelAlreadyExists,
346 OpenHrmpChannelAlreadyRequested,
348 OpenHrmpChannelLimitExceeded,
350 AcceptHrmpChannelDoesntExist,
352 AcceptHrmpChannelAlreadyConfirmed,
354 AcceptHrmpChannelLimitExceeded,
356 CloseHrmpChannelUnauthorized,
358 CloseHrmpChannelDoesntExist,
360 CloseHrmpChannelAlreadyUnderway,
362 CancelHrmpOpenChannelUnauthorized,
364 OpenHrmpChannelDoesntExist,
366 OpenHrmpChannelAlreadyConfirmed,
368 WrongWitness,
370 ChannelCreationNotAuthorized,
372 }
373
374 #[pezpallet::storage]
381 pub type HrmpOpenChannelRequests<T: Config> =
382 StorageMap<_, Twox64Concat, HrmpChannelId, HrmpOpenChannelRequest>;
383
384 #[pezpallet::storage]
388 pub type HrmpOpenChannelRequestsList<T: Config> =
389 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
390
391 #[pezpallet::storage]
395 pub type HrmpOpenChannelRequestCount<T: Config> =
396 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
397
398 #[pezpallet::storage]
402 pub type HrmpAcceptedChannelRequestCount<T: Config> =
403 StorageMap<_, Twox64Concat, ParaId, u32, ValueQuery>;
404
405 #[pezpallet::storage]
413 pub type HrmpCloseChannelRequests<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, ()>;
414
415 #[pezpallet::storage]
416 pub type HrmpCloseChannelRequestsList<T: Config> =
417 StorageValue<_, Vec<HrmpChannelId>, ValueQuery>;
418
419 #[pezpallet::storage]
424 pub type HrmpWatermarks<T: Config> = StorageMap<_, Twox64Concat, ParaId, BlockNumberFor<T>>;
425
426 #[pezpallet::storage]
430 pub type HrmpChannels<T: Config> = StorageMap<_, Twox64Concat, HrmpChannelId, HrmpChannel>;
431
432 #[pezpallet::storage]
446 pub type HrmpIngressChannelsIndex<T: Config> =
447 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
448
449 #[pezpallet::storage]
452 pub type HrmpEgressChannelsIndex<T: Config> =
453 StorageMap<_, Twox64Concat, ParaId, Vec<ParaId>, ValueQuery>;
454
455 #[pezpallet::storage]
458 pub type HrmpChannelContents<T: Config> = StorageMap<
459 _,
460 Twox64Concat,
461 HrmpChannelId,
462 Vec<InboundHrmpMessage<BlockNumberFor<T>>>,
463 ValueQuery,
464 >;
465
466 #[pezpallet::storage]
473 pub type HrmpChannelDigests<T: Config> =
474 StorageMap<_, Twox64Concat, ParaId, Vec<(BlockNumberFor<T>, Vec<ParaId>)>, ValueQuery>;
475
476 #[pezpallet::genesis_config]
490 #[derive(DefaultNoBound)]
491 pub struct GenesisConfig<T: Config> {
492 #[serde(skip)]
493 _config: core::marker::PhantomData<T>,
494 preopen_hrmp_channels: Vec<(ParaId, ParaId, u32, u32)>,
495 }
496
497 #[pezpallet::genesis_build]
498 impl<T: Config> BuildGenesisConfig for GenesisConfig<T> {
499 fn build(&self) {
500 initialize_storage::<T>(&self.preopen_hrmp_channels);
501 }
502 }
503
504 #[pezpallet::call]
505 impl<T: Config> Pezpallet<T> {
506 #[pezpallet::call_index(0)]
517 #[pezpallet::weight(<T as Config>::WeightInfo::hrmp_init_open_channel())]
518 pub fn hrmp_init_open_channel(
519 origin: OriginFor<T>,
520 recipient: ParaId,
521 proposed_max_capacity: u32,
522 proposed_max_message_size: u32,
523 ) -> DispatchResult {
524 let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
525 Self::init_open_channel(
526 origin,
527 recipient,
528 proposed_max_capacity,
529 proposed_max_message_size,
530 )?;
531 Self::deposit_event(Event::OpenChannelRequested {
532 sender: origin,
533 recipient,
534 proposed_max_capacity,
535 proposed_max_message_size,
536 });
537 Ok(())
538 }
539
540 #[pezpallet::call_index(1)]
544 #[pezpallet::weight(<T as Config>::WeightInfo::hrmp_accept_open_channel())]
545 pub fn hrmp_accept_open_channel(origin: OriginFor<T>, sender: ParaId) -> DispatchResult {
546 let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
547 Self::accept_open_channel(origin, sender)?;
548 Self::deposit_event(Event::OpenChannelAccepted { sender, recipient: origin });
549 Ok(())
550 }
551
552 #[pezpallet::call_index(2)]
557 #[pezpallet::weight(<T as Config>::WeightInfo::hrmp_close_channel())]
558 pub fn hrmp_close_channel(
559 origin: OriginFor<T>,
560 channel_id: HrmpChannelId,
561 ) -> DispatchResult {
562 let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
563 Self::close_channel(origin, channel_id.clone())?;
564 Self::deposit_event(Event::ChannelClosed { by_teyrchain: origin, channel_id });
565 Ok(())
566 }
567
568 #[pezpallet::call_index(3)]
576 #[pezpallet::weight(<T as Config>::WeightInfo::force_clean_hrmp(*num_inbound, *num_outbound))]
577 pub fn force_clean_hrmp(
578 origin: OriginFor<T>,
579 para: ParaId,
580 num_inbound: u32,
581 num_outbound: u32,
582 ) -> DispatchResult {
583 T::ChannelManager::ensure_origin(origin)?;
584
585 ensure!(
586 HrmpIngressChannelsIndex::<T>::decode_len(para).unwrap_or_default()
587 <= num_inbound as usize,
588 Error::<T>::WrongWitness
589 );
590 ensure!(
591 HrmpEgressChannelsIndex::<T>::decode_len(para).unwrap_or_default()
592 <= num_outbound as usize,
593 Error::<T>::WrongWitness
594 );
595
596 Self::clean_hrmp_after_outgoing(¶);
597 Ok(())
598 }
599
600 #[pezpallet::call_index(4)]
609 #[pezpallet::weight(<T as Config>::WeightInfo::force_process_hrmp_open(*channels))]
610 pub fn force_process_hrmp_open(origin: OriginFor<T>, channels: u32) -> DispatchResult {
611 T::ChannelManager::ensure_origin(origin)?;
612
613 ensure!(
614 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32
615 <= channels,
616 Error::<T>::WrongWitness
617 );
618
619 let host_config = configuration::ActiveConfig::<T>::get();
620 Self::process_hrmp_open_channel_requests(&host_config);
621 Ok(())
622 }
623
624 #[pezpallet::call_index(5)]
633 #[pezpallet::weight(<T as Config>::WeightInfo::force_process_hrmp_close(*channels))]
634 pub fn force_process_hrmp_close(origin: OriginFor<T>, channels: u32) -> DispatchResult {
635 T::ChannelManager::ensure_origin(origin)?;
636
637 ensure!(
638 HrmpCloseChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32
639 <= channels,
640 Error::<T>::WrongWitness
641 );
642
643 Self::process_hrmp_close_channel_requests();
644 Ok(())
645 }
646
647 #[pezpallet::call_index(6)]
656 #[pezpallet::weight(<T as Config>::WeightInfo::hrmp_cancel_open_request(*open_requests))]
657 pub fn hrmp_cancel_open_request(
658 origin: OriginFor<T>,
659 channel_id: HrmpChannelId,
660 open_requests: u32,
661 ) -> DispatchResult {
662 let origin = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
663 ensure!(
664 HrmpOpenChannelRequestsList::<T>::decode_len().unwrap_or_default() as u32
665 <= open_requests,
666 Error::<T>::WrongWitness
667 );
668 Self::cancel_open_request(origin, channel_id.clone())?;
669 Self::deposit_event(Event::OpenChannelCanceled { by_teyrchain: origin, channel_id });
670 Ok(())
671 }
672
673 #[pezpallet::call_index(7)]
682 #[pezpallet::weight(<T as Config>::WeightInfo::force_open_hrmp_channel(1))]
683 pub fn force_open_hrmp_channel(
684 origin: OriginFor<T>,
685 sender: ParaId,
686 recipient: ParaId,
687 max_capacity: u32,
688 max_message_size: u32,
689 ) -> DispatchResultWithPostInfo {
690 T::ChannelManager::ensure_origin(origin)?;
691
692 let channel_id = HrmpChannelId { sender, recipient };
697 let cancel_request: u32 =
698 if let Some(_open_channel) = HrmpOpenChannelRequests::<T>::get(&channel_id) {
699 Self::cancel_open_request(sender, channel_id)?;
700 1
701 } else {
702 0
703 };
704
705 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
708 Self::accept_open_channel(recipient, sender)?;
709 Self::deposit_event(Event::HrmpChannelForceOpened {
710 sender,
711 recipient,
712 proposed_max_capacity: max_capacity,
713 proposed_max_message_size: max_message_size,
714 });
715
716 Ok(Some(<T as Config>::WeightInfo::force_open_hrmp_channel(cancel_request)).into())
717 }
718
719 #[pezpallet::call_index(8)]
732 #[pezpallet::weight(<T as Config>::WeightInfo::establish_system_channel())]
733 pub fn establish_system_channel(
734 origin: OriginFor<T>,
735 sender: ParaId,
736 recipient: ParaId,
737 ) -> DispatchResultWithPostInfo {
738 let _caller = ensure_signed(origin)?;
739
740 ensure!(
742 sender.is_system() && recipient.is_system(),
743 Error::<T>::ChannelCreationNotAuthorized
744 );
745
746 let config = configuration::ActiveConfig::<T>::get();
747 let max_message_size = config.hrmp_channel_max_message_size;
748 let max_capacity = config.hrmp_channel_max_capacity;
749
750 Self::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
751 Self::accept_open_channel(recipient, sender)?;
752
753 Self::deposit_event(Event::HrmpSystemChannelOpened {
754 sender,
755 recipient,
756 proposed_max_capacity: max_capacity,
757 proposed_max_message_size: max_message_size,
758 });
759
760 Ok(Pays::No.into())
761 }
762
763 #[pezpallet::call_index(9)]
773 #[pezpallet::weight(<T as Config>::WeightInfo::poke_channel_deposits())]
774 pub fn poke_channel_deposits(
775 origin: OriginFor<T>,
776 sender: ParaId,
777 recipient: ParaId,
778 ) -> DispatchResult {
779 let _caller = ensure_signed(origin)?;
780 let channel_id = HrmpChannelId { sender, recipient };
781 let is_system = sender.is_system() || recipient.is_system();
782
783 let config = configuration::ActiveConfig::<T>::get();
784
785 let (new_sender_deposit, new_recipient_deposit) = if is_system {
787 (0, 0)
788 } else {
789 (config.hrmp_sender_deposit, config.hrmp_recipient_deposit)
790 };
791
792 HrmpChannels::<T>::mutate(&channel_id, |channel| -> DispatchResult {
793 if let Some(ref mut channel) = channel {
794 let current_sender_deposit = channel.sender_deposit;
795 let current_recipient_deposit = channel.recipient_deposit;
796
797 if current_sender_deposit == new_sender_deposit
799 && current_recipient_deposit == new_recipient_deposit
800 {
801 return Ok(());
802 }
803
804 if current_sender_deposit > new_sender_deposit {
806 let amount = current_sender_deposit
808 .checked_sub(new_sender_deposit)
809 .ok_or(ArithmeticError::Underflow)?;
810 T::Currency::unreserve(
811 &channel_id.sender.into_account_truncating(),
812 amount.try_into().unwrap_or(Zero::zero()),
815 );
816 } else if current_sender_deposit < new_sender_deposit {
817 let amount = new_sender_deposit
818 .checked_sub(current_sender_deposit)
819 .ok_or(ArithmeticError::Underflow)?;
820 T::Currency::reserve(
821 &channel_id.sender.into_account_truncating(),
822 amount.try_into().unwrap_or(Zero::zero()),
823 )?;
824 }
825
826 if current_recipient_deposit > new_recipient_deposit {
828 let amount = current_recipient_deposit
829 .checked_sub(new_recipient_deposit)
830 .ok_or(ArithmeticError::Underflow)?;
831 T::Currency::unreserve(
832 &channel_id.recipient.into_account_truncating(),
833 amount.try_into().unwrap_or(Zero::zero()),
834 );
835 } else if current_recipient_deposit < new_recipient_deposit {
836 let amount = new_recipient_deposit
837 .checked_sub(current_recipient_deposit)
838 .ok_or(ArithmeticError::Underflow)?;
839 T::Currency::reserve(
840 &channel_id.recipient.into_account_truncating(),
841 amount.try_into().unwrap_or(Zero::zero()),
842 )?;
843 }
844
845 channel.sender_deposit = new_sender_deposit;
847 channel.recipient_deposit = new_recipient_deposit;
848 } else {
849 return Err(Error::<T>::OpenHrmpChannelDoesntExist.into());
850 }
851 Ok(())
852 })?;
853
854 Self::deposit_event(Event::OpenChannelDepositsUpdated { sender, recipient });
855
856 Ok(())
857 }
858
859 #[pezpallet::call_index(10)]
867 #[pezpallet::weight(<T as Config>::WeightInfo::establish_channel_with_system())]
868 pub fn establish_channel_with_system(
869 origin: OriginFor<T>,
870 target_system_chain: ParaId,
871 ) -> DispatchResultWithPostInfo {
872 let sender = ensure_teyrchain(<T as Config>::RuntimeOrigin::from(origin))?;
873
874 ensure!(target_system_chain.is_system(), Error::<T>::ChannelCreationNotAuthorized);
875
876 let (max_message_size, max_capacity) =
877 T::DefaultChannelSizeAndCapacityWithSystem::get();
878
879 Self::init_open_channel(sender, target_system_chain, max_capacity, max_message_size)?;
881 Self::accept_open_channel(target_system_chain, sender)?;
882
883 Self::init_open_channel(target_system_chain, sender, max_capacity, max_message_size)?;
884 Self::accept_open_channel(sender, target_system_chain)?;
885
886 Self::deposit_event(Event::HrmpSystemChannelOpened {
887 sender,
888 recipient: target_system_chain,
889 proposed_max_capacity: max_capacity,
890 proposed_max_message_size: max_message_size,
891 });
892
893 Self::deposit_event(Event::HrmpSystemChannelOpened {
894 sender: target_system_chain,
895 recipient: sender,
896 proposed_max_capacity: max_capacity,
897 proposed_max_message_size: max_message_size,
898 });
899
900 Ok(Pays::No.into())
901 }
902 }
903}
904
905fn initialize_storage<T: Config>(preopen_hrmp_channels: &[(ParaId, ParaId, u32, u32)]) {
906 let host_config = configuration::ActiveConfig::<T>::get();
907 for &(sender, recipient, max_capacity, max_message_size) in preopen_hrmp_channels {
908 if let Err(err) =
909 preopen_hrmp_channel::<T>(sender, recipient, max_capacity, max_message_size)
910 {
911 panic!("failed to initialize the genesis storage: {:?}", err);
912 }
913 }
914 Pezpallet::<T>::process_hrmp_open_channel_requests(&host_config);
915}
916
917fn preopen_hrmp_channel<T: Config>(
918 sender: ParaId,
919 recipient: ParaId,
920 max_capacity: u32,
921 max_message_size: u32,
922) -> DispatchResult {
923 Pezpallet::<T>::init_open_channel(sender, recipient, max_capacity, max_message_size)?;
924 Pezpallet::<T>::accept_open_channel(recipient, sender)?;
925 Ok(())
926}
927
928impl<T: Config> Pezpallet<T> {
930 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
932 Weight::zero()
933 }
934
935 pub(crate) fn initializer_finalize() {}
937
938 pub(crate) fn initializer_on_new_session(
940 notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
941 outgoing_paras: &[ParaId],
942 ) -> Weight {
943 let w1 = Self::perform_outgoing_para_cleanup(¬ification.prev_config, outgoing_paras);
944 Self::process_hrmp_open_channel_requests(¬ification.prev_config);
945 Self::process_hrmp_close_channel_requests();
946 w1.saturating_add(<T as Config>::WeightInfo::force_process_hrmp_open(
947 outgoing_paras.len() as u32
948 ))
949 .saturating_add(<T as Config>::WeightInfo::force_process_hrmp_close(
950 outgoing_paras.len() as u32,
951 ))
952 }
953
954 fn perform_outgoing_para_cleanup(
957 config: &HostConfiguration<BlockNumberFor<T>>,
958 outgoing: &[ParaId],
959 ) -> Weight {
960 let mut w = Self::clean_open_channel_requests(config, outgoing);
961 for outgoing_para in outgoing {
962 Self::clean_hrmp_after_outgoing(outgoing_para);
963
964 let ingress_count =
967 HrmpIngressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
968 let egress_count =
969 HrmpEgressChannelsIndex::<T>::decode_len(outgoing_para).unwrap_or_default() as u32;
970 w = w.saturating_add(<T as Config>::WeightInfo::force_clean_hrmp(
971 ingress_count,
972 egress_count,
973 ));
974 }
975 w
976 }
977
978 pub(crate) fn clean_open_channel_requests(
982 config: &HostConfiguration<BlockNumberFor<T>>,
983 outgoing: &[ParaId],
984 ) -> Weight {
985 let open_channel_reqs = HrmpOpenChannelRequestsList::<T>::get();
991 let (go, stay): (Vec<HrmpChannelId>, Vec<HrmpChannelId>) = open_channel_reqs
992 .into_iter()
993 .partition(|req_id| outgoing.iter().any(|id| req_id.is_participant(*id)));
994 HrmpOpenChannelRequestsList::<T>::put(stay);
995
996 for req_id in go {
999 let req_data = match HrmpOpenChannelRequests::<T>::take(&req_id) {
1000 Some(req_data) => req_data,
1001 None => {
1002 continue;
1004 },
1005 };
1006
1007 if !outgoing.contains(&req_id.sender) {
1009 T::Currency::unreserve(
1010 &req_id.sender.into_account_truncating(),
1011 req_data.sender_deposit.unique_saturated_into(),
1012 );
1013 }
1014
1015 if req_data.confirmed {
1021 if !outgoing.contains(&req_id.recipient) {
1022 T::Currency::unreserve(
1023 &req_id.recipient.into_account_truncating(),
1024 config.hrmp_recipient_deposit.unique_saturated_into(),
1025 );
1026 }
1027 Self::decrease_accepted_channel_request_count(req_id.recipient);
1028 }
1029 }
1030
1031 <T as Config>::WeightInfo::clean_open_channel_requests(outgoing.len() as u32)
1032 }
1033
1034 fn clean_hrmp_after_outgoing(outgoing_para: &ParaId) {
1036 HrmpOpenChannelRequestCount::<T>::remove(outgoing_para);
1037 HrmpAcceptedChannelRequestCount::<T>::remove(outgoing_para);
1038
1039 let ingress = HrmpIngressChannelsIndex::<T>::take(outgoing_para)
1040 .into_iter()
1041 .map(|sender| HrmpChannelId { sender, recipient: *outgoing_para });
1042 let egress = HrmpEgressChannelsIndex::<T>::take(outgoing_para)
1043 .into_iter()
1044 .map(|recipient| HrmpChannelId { sender: *outgoing_para, recipient });
1045 let mut to_close = ingress.chain(egress).collect::<Vec<_>>();
1046 to_close.sort();
1047 to_close.dedup();
1048
1049 for channel in to_close {
1050 Self::close_hrmp_channel(&channel);
1051 }
1052 }
1053
1054 fn process_hrmp_open_channel_requests(config: &HostConfiguration<BlockNumberFor<T>>) {
1059 let mut open_req_channels = HrmpOpenChannelRequestsList::<T>::get();
1060 if open_req_channels.is_empty() {
1061 return;
1062 }
1063
1064 let mut idx = open_req_channels.len();
1067 loop {
1068 if idx == 0 {
1070 break;
1071 }
1072
1073 idx -= 1;
1074 let channel_id = open_req_channels[idx].clone();
1075 let request = HrmpOpenChannelRequests::<T>::get(&channel_id).expect(
1076 "can't be `None` due to the invariant that the list contains the same items as the set; qed",
1077 );
1078
1079 let system_channel = channel_id.sender.is_system() || channel_id.recipient.is_system();
1080 let sender_deposit = request.sender_deposit;
1081 let recipient_deposit = if system_channel { 0 } else { config.hrmp_recipient_deposit };
1082
1083 if request.confirmed {
1084 if paras::Pezpallet::<T>::is_valid_para(channel_id.sender)
1085 && paras::Pezpallet::<T>::is_valid_para(channel_id.recipient)
1086 {
1087 HrmpChannels::<T>::insert(
1088 &channel_id,
1089 HrmpChannel {
1090 sender_deposit,
1091 recipient_deposit,
1092 max_capacity: request.max_capacity,
1093 max_total_size: request.max_total_size,
1094 max_message_size: request.max_message_size,
1095 msg_count: 0,
1096 total_size: 0,
1097 mqc_head: None,
1098 },
1099 );
1100
1101 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1102 if let Err(i) = v.binary_search(&channel_id.sender) {
1103 v.insert(i, channel_id.sender);
1104 }
1105 });
1106 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1107 if let Err(i) = v.binary_search(&channel_id.recipient) {
1108 v.insert(i, channel_id.recipient);
1109 }
1110 });
1111 }
1112
1113 Self::decrease_open_channel_request_count(channel_id.sender);
1114 Self::decrease_accepted_channel_request_count(channel_id.recipient);
1115
1116 let _ = open_req_channels.swap_remove(idx);
1117 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1118 }
1119 }
1120
1121 HrmpOpenChannelRequestsList::<T>::put(open_req_channels);
1122 }
1123
1124 fn process_hrmp_close_channel_requests() {
1126 let close_reqs = HrmpCloseChannelRequestsList::<T>::take();
1127 for condemned_ch_id in close_reqs {
1128 HrmpCloseChannelRequests::<T>::remove(&condemned_ch_id);
1129 Self::close_hrmp_channel(&condemned_ch_id);
1130 }
1131 }
1132
1133 fn close_hrmp_channel(channel_id: &HrmpChannelId) {
1140 if let Some(HrmpChannel { sender_deposit, recipient_deposit, .. }) =
1141 HrmpChannels::<T>::take(channel_id)
1142 {
1143 T::Currency::unreserve(
1144 &channel_id.sender.into_account_truncating(),
1145 sender_deposit.unique_saturated_into(),
1146 );
1147 T::Currency::unreserve(
1148 &channel_id.recipient.into_account_truncating(),
1149 recipient_deposit.unique_saturated_into(),
1150 );
1151 }
1152
1153 HrmpChannelContents::<T>::remove(channel_id);
1154
1155 HrmpEgressChannelsIndex::<T>::mutate(&channel_id.sender, |v| {
1156 if let Ok(i) = v.binary_search(&channel_id.recipient) {
1157 v.remove(i);
1158 }
1159 });
1160 HrmpIngressChannelsIndex::<T>::mutate(&channel_id.recipient, |v| {
1161 if let Ok(i) = v.binary_search(&channel_id.sender) {
1162 v.remove(i);
1163 }
1164 });
1165 }
1166
1167 pub(crate) fn check_hrmp_watermark(
1169 recipient: ParaId,
1170 relay_chain_parent_number: BlockNumberFor<T>,
1171 new_hrmp_watermark: BlockNumberFor<T>,
1172 ) -> Result<(), HrmpWatermarkAcceptanceErr<BlockNumberFor<T>>> {
1173 if new_hrmp_watermark == relay_chain_parent_number {
1182 return Ok(());
1183 }
1184
1185 if new_hrmp_watermark > relay_chain_parent_number {
1186 return Err(HrmpWatermarkAcceptanceErr::AheadRelayParent {
1187 new_watermark: new_hrmp_watermark,
1188 relay_chain_parent_number,
1189 });
1190 }
1191
1192 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1193 if new_hrmp_watermark < last_watermark {
1194 return Err(HrmpWatermarkAcceptanceErr::AdvancementRule {
1195 new_watermark: new_hrmp_watermark,
1196 last_watermark,
1197 });
1198 }
1199
1200 if new_hrmp_watermark == last_watermark {
1201 return Ok(());
1202 }
1203 }
1204
1205 let digest = HrmpChannelDigests::<T>::get(&recipient);
1210 if !digest
1211 .binary_search_by_key(&new_hrmp_watermark, |(block_no, _)| *block_no)
1212 .is_ok()
1213 {
1214 return Err(HrmpWatermarkAcceptanceErr::LandsOnBlockWithNoMessages {
1215 new_watermark: new_hrmp_watermark,
1216 });
1217 }
1218 Ok(())
1219 }
1220
1221 pub(crate) fn valid_watermarks(recipient: ParaId) -> Vec<BlockNumberFor<T>> {
1223 let mut valid_watermarks: Vec<_> = HrmpChannelDigests::<T>::get(&recipient)
1224 .into_iter()
1225 .map(|(block_no, _)| block_no)
1226 .collect();
1227
1228 if let Some(last_watermark) = HrmpWatermarks::<T>::get(&recipient) {
1230 if valid_watermarks.first().map_or(false, |w| w > &last_watermark) {
1231 valid_watermarks.insert(0, last_watermark);
1232 }
1233 }
1234
1235 valid_watermarks
1236 }
1237
1238 pub(crate) fn check_outbound_hrmp(
1239 config: &HostConfiguration<BlockNumberFor<T>>,
1240 sender: ParaId,
1241 out_hrmp_msgs: &[OutboundHrmpMessage<ParaId>],
1242 ) -> Result<(), OutboundHrmpAcceptanceErr> {
1243 if out_hrmp_msgs.len() as u32 > config.hrmp_max_message_num_per_candidate {
1244 return Err(OutboundHrmpAcceptanceErr::MoreMessagesThanPermitted {
1245 sent: out_hrmp_msgs.len() as u32,
1246 permitted: config.hrmp_max_message_num_per_candidate,
1247 });
1248 }
1249
1250 let mut last_recipient = None::<ParaId>;
1251
1252 for (idx, out_msg) in
1253 out_hrmp_msgs.iter().enumerate().map(|(idx, out_msg)| (idx as u32, out_msg))
1254 {
1255 match last_recipient {
1256 Some(last_recipient) if out_msg.recipient <= last_recipient => {
1260 return Err(OutboundHrmpAcceptanceErr::NotSorted { idx })
1261 },
1262 _ => last_recipient = Some(out_msg.recipient),
1263 }
1264
1265 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1266
1267 let channel = match HrmpChannels::<T>::get(&channel_id) {
1268 Some(channel) => channel,
1269 None => return Err(OutboundHrmpAcceptanceErr::NoSuchChannel { channel_id, idx }),
1270 };
1271
1272 let msg_size = out_msg.data.len() as u32;
1273 if msg_size > channel.max_message_size {
1274 return Err(OutboundHrmpAcceptanceErr::MaxMessageSizeExceeded {
1275 idx,
1276 msg_size,
1277 max_size: channel.max_message_size,
1278 });
1279 }
1280
1281 let new_total_size = channel.total_size + out_msg.data.len() as u32;
1282 if new_total_size > channel.max_total_size {
1283 return Err(OutboundHrmpAcceptanceErr::TotalSizeExceeded {
1284 idx,
1285 total_size: new_total_size,
1286 limit: channel.max_total_size,
1287 });
1288 }
1289
1290 let new_msg_count = channel.msg_count + 1;
1291 if new_msg_count > channel.max_capacity {
1292 return Err(OutboundHrmpAcceptanceErr::CapacityExceeded {
1293 idx,
1294 count: new_msg_count,
1295 limit: channel.max_capacity,
1296 });
1297 }
1298 }
1299
1300 Ok(())
1301 }
1302
1303 pub(crate) fn outbound_remaining_capacity(sender: ParaId) -> Vec<(ParaId, (u32, u32))> {
1305 let recipients = HrmpEgressChannelsIndex::<T>::get(&sender);
1306 let mut remaining = Vec::with_capacity(recipients.len());
1307
1308 for recipient in recipients {
1309 let Some(channel) = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient }) else {
1310 continue;
1311 };
1312 remaining.push((
1313 recipient,
1314 (
1315 channel.max_capacity - channel.msg_count,
1316 channel.max_total_size - channel.total_size,
1317 ),
1318 ));
1319 }
1320
1321 remaining
1322 }
1323
1324 pub(crate) fn prune_hrmp(recipient: ParaId, new_hrmp_watermark: BlockNumberFor<T>) {
1325 let senders = HrmpChannelDigests::<T>::mutate(&recipient, |digest| {
1328 let mut senders = BTreeSet::new();
1329 let mut leftover = Vec::with_capacity(digest.len());
1330 for (block_no, paras_sent_msg) in mem::replace(digest, Vec::new()) {
1331 if block_no <= new_hrmp_watermark {
1332 senders.extend(paras_sent_msg);
1333 } else {
1334 leftover.push((block_no, paras_sent_msg));
1335 }
1336 }
1337 *digest = leftover;
1338 senders
1339 });
1340
1341 let channels_to_prune =
1343 senders.into_iter().map(|sender| HrmpChannelId { sender, recipient });
1344 for channel_id in channels_to_prune {
1345 let (mut pruned_cnt, mut pruned_size) = (0, 0);
1348
1349 let contents = HrmpChannelContents::<T>::get(&channel_id);
1350 let mut leftover = Vec::with_capacity(contents.len());
1351 for msg in contents {
1352 if msg.sent_at <= new_hrmp_watermark {
1353 pruned_cnt += 1;
1354 pruned_size += msg.data.len();
1355 } else {
1356 leftover.push(msg);
1357 }
1358 }
1359 if !leftover.is_empty() {
1360 HrmpChannelContents::<T>::insert(&channel_id, leftover);
1361 } else {
1362 HrmpChannelContents::<T>::remove(&channel_id);
1363 }
1364
1365 HrmpChannels::<T>::mutate(&channel_id, |channel| {
1367 if let Some(ref mut channel) = channel {
1368 channel.msg_count -= pruned_cnt as u32;
1369 channel.total_size -= pruned_size as u32;
1370 }
1371 });
1372 }
1373
1374 HrmpWatermarks::<T>::insert(&recipient, new_hrmp_watermark);
1375 }
1376
1377 pub(crate) fn queue_outbound_hrmp(sender: ParaId, out_hrmp_msgs: HorizontalMessages) {
1379 let now = pezframe_system::Pezpallet::<T>::block_number();
1380
1381 for out_msg in out_hrmp_msgs {
1382 let channel_id = HrmpChannelId { sender, recipient: out_msg.recipient };
1383
1384 let mut channel = match HrmpChannels::<T>::get(&channel_id) {
1385 Some(channel) => channel,
1386 None => {
1387 continue;
1390 },
1391 };
1392
1393 let inbound = InboundHrmpMessage { sent_at: now, data: out_msg.data };
1394
1395 channel.msg_count += 1;
1397 channel.total_size += inbound.data.len() as u32;
1398
1399 let prev_head = channel.mqc_head.unwrap_or(Default::default());
1401 let new_head = BlakeTwo256::hash_of(&(
1402 prev_head,
1403 inbound.sent_at,
1404 T::Hashing::hash_of(&inbound.data),
1405 ));
1406 channel.mqc_head = Some(new_head);
1407
1408 HrmpChannels::<T>::insert(&channel_id, channel);
1409 HrmpChannelContents::<T>::append(&channel_id, inbound);
1410
1411 let mut recipient_digest = HrmpChannelDigests::<T>::get(&channel_id.recipient);
1424 if let Some(cur_block_digest) = recipient_digest
1425 .last_mut()
1426 .filter(|(block_no, _)| *block_no == now)
1427 .map(|(_, ref mut d)| d)
1428 {
1429 cur_block_digest.push(sender);
1430 } else {
1431 recipient_digest.push((now, vec![sender]));
1432 }
1433 HrmpChannelDigests::<T>::insert(&channel_id.recipient, recipient_digest);
1434 }
1435 }
1436
1437 pub fn init_open_channel(
1445 origin: ParaId,
1446 recipient: ParaId,
1447 proposed_max_capacity: u32,
1448 proposed_max_message_size: u32,
1449 ) -> DispatchResult {
1450 ensure!(origin != recipient, Error::<T>::OpenHrmpChannelToSelf);
1451 ensure!(
1452 paras::Pezpallet::<T>::is_valid_para(recipient),
1453 Error::<T>::OpenHrmpChannelInvalidRecipient,
1454 );
1455
1456 let config = configuration::ActiveConfig::<T>::get();
1457 ensure!(proposed_max_capacity > 0, Error::<T>::OpenHrmpChannelZeroCapacity);
1458 ensure!(
1459 proposed_max_capacity <= config.hrmp_channel_max_capacity,
1460 Error::<T>::OpenHrmpChannelCapacityExceedsLimit,
1461 );
1462 ensure!(proposed_max_message_size > 0, Error::<T>::OpenHrmpChannelZeroMessageSize);
1463 ensure!(
1464 proposed_max_message_size <= config.hrmp_channel_max_message_size,
1465 Error::<T>::OpenHrmpChannelMessageSizeExceedsLimit,
1466 );
1467
1468 let channel_id = HrmpChannelId { sender: origin, recipient };
1469 ensure!(
1470 HrmpOpenChannelRequests::<T>::get(&channel_id).is_none(),
1471 Error::<T>::OpenHrmpChannelAlreadyRequested,
1472 );
1473 ensure!(
1474 HrmpChannels::<T>::get(&channel_id).is_none(),
1475 Error::<T>::OpenHrmpChannelAlreadyExists,
1476 );
1477
1478 let egress_cnt = HrmpEgressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1479 let open_req_cnt = HrmpOpenChannelRequestCount::<T>::get(&origin);
1480 let channel_num_limit = config.hrmp_max_teyrchain_outbound_channels;
1481 ensure!(
1482 egress_cnt + open_req_cnt < channel_num_limit,
1483 Error::<T>::OpenHrmpChannelLimitExceeded,
1484 );
1485
1486 let is_system = origin.is_system() || recipient.is_system();
1488 let deposit = if is_system { 0 } else { config.hrmp_sender_deposit };
1489 if !deposit.is_zero() {
1490 T::Currency::reserve(
1491 &origin.into_account_truncating(),
1492 deposit.unique_saturated_into(),
1493 )?;
1494 }
1495
1496 HrmpOpenChannelRequestCount::<T>::insert(&origin, open_req_cnt + 1);
1499 HrmpOpenChannelRequests::<T>::insert(
1500 &channel_id,
1501 HrmpOpenChannelRequest {
1502 confirmed: false,
1503 _age: 0,
1504 sender_deposit: deposit,
1505 max_capacity: proposed_max_capacity,
1506 max_message_size: proposed_max_message_size,
1507 max_total_size: config.hrmp_channel_max_total_size,
1508 },
1509 );
1510 HrmpOpenChannelRequestsList::<T>::append(channel_id);
1511
1512 Self::send_to_para(
1513 "init_open_channel",
1514 &config,
1515 recipient,
1516 Self::wrap_notification(|| {
1517 use xcm::opaque::latest::{prelude::*, Xcm};
1518 Xcm(vec![HrmpNewChannelOpenRequest {
1519 sender: origin.into(),
1520 max_capacity: proposed_max_capacity,
1521 max_message_size: proposed_max_message_size,
1522 }])
1523 }),
1524 );
1525
1526 Ok(())
1527 }
1528
1529 pub fn accept_open_channel(origin: ParaId, sender: ParaId) -> DispatchResult {
1534 let channel_id = HrmpChannelId { sender, recipient: origin };
1535 let mut channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1536 .ok_or(Error::<T>::AcceptHrmpChannelDoesntExist)?;
1537 ensure!(!channel_req.confirmed, Error::<T>::AcceptHrmpChannelAlreadyConfirmed);
1538
1539 let config = configuration::ActiveConfig::<T>::get();
1542 let channel_num_limit = config.hrmp_max_teyrchain_inbound_channels;
1543 let ingress_cnt = HrmpIngressChannelsIndex::<T>::decode_len(&origin).unwrap_or(0) as u32;
1544 let accepted_cnt = HrmpAcceptedChannelRequestCount::<T>::get(&origin);
1545 ensure!(
1546 ingress_cnt + accepted_cnt < channel_num_limit,
1547 Error::<T>::AcceptHrmpChannelLimitExceeded,
1548 );
1549
1550 let is_system = origin.is_system() || sender.is_system();
1552 let deposit = if is_system { 0 } else { config.hrmp_recipient_deposit };
1553 if !deposit.is_zero() {
1554 T::Currency::reserve(
1555 &origin.into_account_truncating(),
1556 deposit.unique_saturated_into(),
1557 )?;
1558 }
1559
1560 channel_req.confirmed = true;
1563 HrmpOpenChannelRequests::<T>::insert(&channel_id, channel_req);
1564 HrmpAcceptedChannelRequestCount::<T>::insert(&origin, accepted_cnt + 1);
1565
1566 Self::send_to_para(
1567 "accept_open_channel",
1568 &config,
1569 sender,
1570 Self::wrap_notification(|| {
1571 use xcm::opaque::latest::{prelude::*, Xcm};
1572 Xcm(vec![HrmpChannelAccepted { recipient: origin.into() }])
1573 }),
1574 );
1575
1576 Ok(())
1577 }
1578
1579 fn cancel_open_request(origin: ParaId, channel_id: HrmpChannelId) -> DispatchResult {
1580 ensure!(channel_id.is_participant(origin), Error::<T>::CancelHrmpOpenChannelUnauthorized);
1582
1583 let open_channel_req = HrmpOpenChannelRequests::<T>::get(&channel_id)
1584 .ok_or(Error::<T>::OpenHrmpChannelDoesntExist)?;
1585 ensure!(!open_channel_req.confirmed, Error::<T>::OpenHrmpChannelAlreadyConfirmed);
1586
1587 HrmpOpenChannelRequests::<T>::remove(&channel_id);
1589 HrmpOpenChannelRequestsList::<T>::mutate(|open_req_channels| {
1590 if let Some(pos) = open_req_channels.iter().position(|x| x == &channel_id) {
1591 open_req_channels.swap_remove(pos);
1592 }
1593 });
1594
1595 Self::decrease_open_channel_request_count(channel_id.sender);
1596 T::Currency::unreserve(
1602 &channel_id.sender.into_account_truncating(),
1603 open_channel_req.sender_deposit.unique_saturated_into(),
1604 );
1605
1606 Ok(())
1607 }
1608
1609 fn close_channel(origin: ParaId, channel_id: HrmpChannelId) -> Result<(), Error<T>> {
1610 ensure!(channel_id.is_participant(origin), Error::<T>::CloseHrmpChannelUnauthorized);
1612
1613 ensure!(
1615 HrmpChannels::<T>::get(&channel_id).is_some(),
1616 Error::<T>::CloseHrmpChannelDoesntExist,
1617 );
1618
1619 ensure!(
1621 HrmpCloseChannelRequests::<T>::get(&channel_id).is_none(),
1622 Error::<T>::CloseHrmpChannelAlreadyUnderway,
1623 );
1624
1625 HrmpCloseChannelRequests::<T>::insert(&channel_id, ());
1626 HrmpCloseChannelRequestsList::<T>::append(channel_id.clone());
1627
1628 let config = configuration::ActiveConfig::<T>::get();
1629 let opposite_party =
1630 if origin == channel_id.sender { channel_id.recipient } else { channel_id.sender };
1631
1632 Self::send_to_para(
1633 "close_channel",
1634 &config,
1635 opposite_party,
1636 Self::wrap_notification(|| {
1637 use xcm::opaque::latest::{prelude::*, Xcm};
1638 Xcm(vec![HrmpChannelClosing {
1639 initiator: origin.into(),
1640 sender: channel_id.sender.into(),
1641 recipient: channel_id.recipient.into(),
1642 }])
1643 }),
1644 );
1645
1646 Ok(())
1647 }
1648
1649 #[cfg(test)]
1653 fn hrmp_mqc_heads(recipient: ParaId) -> Vec<(ParaId, Hash)> {
1654 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1655
1656 let mut mqc_heads = Vec::with_capacity(sender_set.len());
1658 for sender in sender_set {
1659 let channel_metadata = HrmpChannels::<T>::get(&HrmpChannelId { sender, recipient });
1660 let mqc_head = channel_metadata
1661 .and_then(|metadata| metadata.mqc_head)
1662 .unwrap_or(Hash::default());
1663 mqc_heads.push((sender, mqc_head));
1664 }
1665
1666 mqc_heads
1667 }
1668
1669 pub(crate) fn inbound_hrmp_channels_contents(
1672 recipient: ParaId,
1673 ) -> BTreeMap<ParaId, Vec<InboundHrmpMessage<BlockNumberFor<T>>>> {
1674 let sender_set = HrmpIngressChannelsIndex::<T>::get(&recipient);
1675
1676 let mut inbound_hrmp_channels_contents = BTreeMap::new();
1677 for sender in sender_set {
1678 let channel_contents =
1679 HrmpChannelContents::<T>::get(&HrmpChannelId { sender, recipient });
1680 inbound_hrmp_channels_contents.insert(sender, channel_contents);
1681 }
1682
1683 inbound_hrmp_channels_contents
1684 }
1685}
1686
1687impl<T: Config> Pezpallet<T> {
1688 fn decrease_open_channel_request_count(sender: ParaId) {
1691 HrmpOpenChannelRequestCount::<T>::mutate_exists(&sender, |opt_rc| {
1692 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1693 0 => None,
1694 n => Some(n),
1695 });
1696 });
1697 }
1698
1699 fn decrease_accepted_channel_request_count(recipient: ParaId) {
1702 HrmpAcceptedChannelRequestCount::<T>::mutate_exists(&recipient, |opt_rc| {
1703 *opt_rc = opt_rc.and_then(|rc| match rc.saturating_sub(1) {
1704 0 => None,
1705 n => Some(n),
1706 });
1707 });
1708 }
1709
1710 #[cfg(any(feature = "runtime-benchmarks", test))]
1711 fn assert_storage_consistency_exhaustive() {
1712 fn assert_is_sorted<T: Ord>(slice: &[T], id: &str) {
1713 assert!(slice.windows(2).all(|xs| xs[0] <= xs[1]), "{} supposed to be sorted", id);
1714 }
1715
1716 let assert_contains_only_onboarded = |paras: Vec<ParaId>, cause: &str| {
1717 for para in paras {
1718 assert!(
1719 crate::paras::Pezpallet::<T>::is_valid_para(para),
1720 "{}: {:?} para is offboarded",
1721 cause,
1722 para
1723 );
1724 }
1725 };
1726
1727 assert_eq!(
1728 HrmpOpenChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1729 HrmpOpenChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1730 );
1731
1732 assert_eq!(
1737 HrmpOpenChannelRequestCount::<T>::iter()
1738 .map(|(k, _)| k)
1739 .collect::<BTreeSet<_>>(),
1740 HrmpOpenChannelRequests::<T>::iter()
1741 .map(|(k, _)| k.sender)
1742 .collect::<BTreeSet<_>>(),
1743 );
1744 for (open_channel_initiator, expected_num) in HrmpOpenChannelRequestCount::<T>::iter() {
1745 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1746 .filter(|(ch, _)| ch.sender == open_channel_initiator)
1747 .count() as u32;
1748 assert_eq!(expected_num, actual_num);
1749 }
1750
1751 assert_eq!(
1754 HrmpAcceptedChannelRequestCount::<T>::iter()
1755 .map(|(k, _)| k)
1756 .collect::<BTreeSet<_>>(),
1757 HrmpOpenChannelRequests::<T>::iter()
1758 .filter(|(_, v)| v.confirmed)
1759 .map(|(k, _)| k.recipient)
1760 .collect::<BTreeSet<_>>(),
1761 );
1762 for (channel_recipient, expected_num) in HrmpAcceptedChannelRequestCount::<T>::iter() {
1763 let actual_num = HrmpOpenChannelRequests::<T>::iter()
1764 .filter(|(ch, v)| ch.recipient == channel_recipient && v.confirmed)
1765 .count() as u32;
1766 assert_eq!(expected_num, actual_num);
1767 }
1768
1769 assert_eq!(
1770 HrmpCloseChannelRequests::<T>::iter().map(|(k, _)| k).collect::<BTreeSet<_>>(),
1771 HrmpCloseChannelRequestsList::<T>::get().into_iter().collect::<BTreeSet<_>>(),
1772 );
1773
1774 assert_contains_only_onboarded(
1777 HrmpWatermarks::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1778 "HRMP watermarks should contain only onboarded paras",
1779 );
1780
1781 for (non_empty_channel, contents) in HrmpChannelContents::<T>::iter() {
1784 assert!(HrmpChannels::<T>::contains_key(&non_empty_channel));
1785
1786 assert!(!contents.is_empty());
1789 }
1790
1791 assert_contains_only_onboarded(
1794 HrmpChannels::<T>::iter()
1795 .flat_map(|(k, _)| vec![k.sender, k.recipient])
1796 .collect::<Vec<_>>(),
1797 "senders and recipients in all channels should be onboarded",
1798 );
1799
1800 let channel_set_derived_from_ingress = HrmpIngressChannelsIndex::<T>::iter()
1820 .flat_map(|(p, v)| v.into_iter().map(|i| (i, p)).collect::<Vec<_>>())
1821 .collect::<BTreeSet<_>>();
1822 let channel_set_derived_from_egress = HrmpEgressChannelsIndex::<T>::iter()
1823 .flat_map(|(p, v)| v.into_iter().map(|e| (p, e)).collect::<Vec<_>>())
1824 .collect::<BTreeSet<_>>();
1825 let channel_set_ground_truth = HrmpChannels::<T>::iter()
1826 .map(|(k, _)| (k.sender, k.recipient))
1827 .collect::<BTreeSet<_>>();
1828 assert_eq!(channel_set_derived_from_ingress, channel_set_derived_from_egress);
1829 assert_eq!(channel_set_derived_from_egress, channel_set_ground_truth);
1830
1831 HrmpIngressChannelsIndex::<T>::iter()
1832 .map(|(_, v)| v)
1833 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1834 HrmpEgressChannelsIndex::<T>::iter()
1835 .map(|(_, v)| v)
1836 .for_each(|v| assert_is_sorted(&v, "HrmpIngressChannelsIndex"));
1837
1838 assert_contains_only_onboarded(
1839 HrmpChannelDigests::<T>::iter().map(|(k, _)| k).collect::<Vec<_>>(),
1840 "HRMP channel digests should contain only onboarded paras",
1841 );
1842 for (_digest_for_para, digest) in HrmpChannelDigests::<T>::iter() {
1843 assert!(digest.windows(2).all(|xs| xs[0].0 < xs[1].0));
1846
1847 for (_, mut senders) in digest {
1848 assert!(!senders.is_empty());
1849
1850 senders.sort();
1853 let orig_senders = senders.clone();
1854 senders.dedup();
1855 assert_eq!(
1856 orig_senders, senders,
1857 "duplicates removed implies existence of duplicates"
1858 );
1859 }
1860 }
1861 }
1862}
1863
1864impl<T: Config> Pezpallet<T> {
1865 fn wrap_notification(
1868 mut notification: impl FnMut() -> xcm::opaque::latest::opaque::Xcm,
1869 ) -> impl FnOnce(ParaId) -> pezkuwi_primitives::DownwardMessage {
1870 use xcm::{
1871 opaque::VersionedXcm,
1872 prelude::{Junction, Location},
1873 WrapVersion,
1874 };
1875
1876 move |dest| {
1878 T::VersionWrapper::wrap_version(
1880 &Location::new(0, [Junction::Teyrchain(dest.into())]),
1881 notification(),
1882 )
1883 .unwrap_or_else(|_| {
1884 VersionedXcm::from(notification())
1887 })
1888 .encode()
1889 }
1890 }
1891
1892 fn send_to_para(
1894 log_label: &str,
1895 config: &HostConfiguration<BlockNumberFor<T>>,
1896 dest: ParaId,
1897 notification_bytes_for: impl FnOnce(ParaId) -> pezkuwi_primitives::DownwardMessage,
1898 ) {
1899 let notification_bytes = notification_bytes_for(dest);
1901
1902 if let Err(dmp::QueueDownwardMessageError::ExceedsMaxMessageSize) =
1904 dmp::Pezpallet::<T>::queue_downward_message(&config, dest, notification_bytes)
1905 {
1906 log::error!(
1909 target: "runtime::hrmp",
1910 "sending '{log_label}::notification_bytes' failed."
1911 );
1912 debug_assert!(false);
1913 }
1914 }
1915}