1#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::BoundedBTreeSet;
60use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62 relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63 ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67 defensive, defensive_assert,
68 traits::{
69 Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70 QueuePausedQuery,
71 },
72 weights::{Weight, WeightMeter},
73 BoundedVec,
74};
75use pallet_message_queue::OnQueueChanged;
76use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
77use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
78use scale_info::TypeInfo;
79use sp_core::MAX_POSSIBLE_ALLOCATION;
80use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
81use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
82use xcm_builder::InspectMessageQueues;
83use xcm_executor::traits::ConvertOrigin;
84
85pub use pallet::*;
86
87pub type OverweightIndex = u64;
89pub type MaxXcmpMessageLenOf<T> =
91 <<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
92
93const LOG_TARGET: &str = "xcmp_queue";
94const DEFAULT_POV_SIZE: u64 = 64 * 1024; pub const XCM_BATCH_SIZE: usize = 250;
97pub const MAX_SIGNALS_PER_PAGE: usize = 3;
99
100pub mod delivery_fee_constants {
102 pub const THRESHOLD_FACTOR: u32 = 2;
104}
105
106#[frame_support::pallet]
107pub mod pallet {
108 use super::*;
109 use frame_support::{pallet_prelude::*, Twox64Concat};
110 use frame_system::pallet_prelude::*;
111
112 #[pallet::pallet]
113 #[pallet::storage_version(migration::STORAGE_VERSION)]
114 pub struct Pallet<T>(_);
115
116 #[pallet::config]
117 pub trait Config: frame_system::Config {
118 #[allow(deprecated)]
119 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
120
121 type ChannelInfo: GetChannelInfo;
123
124 type VersionWrapper: WrapVersion;
126
127 type XcmpQueue: EnqueueMessage<ParaId>
132 + QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
133
134 #[pallet::constant]
140 type MaxInboundSuspended: Get<u32>;
141
142 #[pallet::constant]
151 type MaxActiveOutboundChannels: Get<u32>;
152
153 #[pallet::constant]
159 type MaxPageSize: Get<u32>;
160
161 type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
163
164 type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
167
168 type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
170
171 type WeightInfo: WeightInfoExt;
173 }
174
175 #[pallet::call]
176 impl<T: Config> Pallet<T> {
177 #[pallet::call_index(1)]
181 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
182 pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
183 T::ControllerOrigin::ensure_origin(origin)?;
184
185 QueueSuspended::<T>::try_mutate(|suspended| {
186 if *suspended {
187 Err(Error::<T>::AlreadySuspended.into())
188 } else {
189 *suspended = true;
190 Ok(())
191 }
192 })
193 }
194
195 #[pallet::call_index(2)]
201 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
202 pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
203 T::ControllerOrigin::ensure_origin(origin)?;
204
205 QueueSuspended::<T>::try_mutate(|suspended| {
206 if !*suspended {
207 Err(Error::<T>::AlreadyResumed.into())
208 } else {
209 *suspended = false;
210 Ok(())
211 }
212 })
213 }
214
215 #[pallet::call_index(3)]
221 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
222 pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
223 ensure_root(origin)?;
224
225 QueueConfig::<T>::try_mutate(|data| {
226 data.suspend_threshold = new;
227 data.validate::<T>()
228 })
229 }
230
231 #[pallet::call_index(4)]
237 #[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
238 pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
239 ensure_root(origin)?;
240
241 QueueConfig::<T>::try_mutate(|data| {
242 data.drop_threshold = new;
243 data.validate::<T>()
244 })
245 }
246
247 #[pallet::call_index(5)]
253 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
254 pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
255 ensure_root(origin)?;
256
257 QueueConfig::<T>::try_mutate(|data| {
258 data.resume_threshold = new;
259 data.validate::<T>()
260 })
261 }
262 }
263
264 #[pallet::hooks]
265 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
266 fn integrity_test() {
267 assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
268
269 let w = Self::on_idle_weight();
270 assert!(w != Weight::zero());
271 assert!(w.all_lte(T::BlockWeights::get().max_block));
272
273 <T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
274 }
275
276 fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
277 let mut meter = WeightMeter::with_limit(limit);
278
279 if meter.try_consume(Self::on_idle_weight()).is_err() {
280 log::debug!(
281 "Not enough weight for on_idle. {} < {}",
282 Self::on_idle_weight(),
283 limit
284 );
285 return meter.consumed()
286 }
287
288 migration::v3::lazy_migrate_inbound_queue::<T>();
289
290 meter.consumed()
291 }
292 }
293
294 #[pallet::event]
295 #[pallet::generate_deposit(pub(super) fn deposit_event)]
296 pub enum Event<T: Config> {
297 XcmpMessageSent { message_hash: XcmHash },
299 }
300
301 #[pallet::error]
302 pub enum Error<T> {
303 BadQueueConfig,
305 AlreadySuspended,
307 AlreadyResumed,
309 TooManyActiveOutboundChannels,
311 TooBig,
313 }
314
315 #[pallet::storage]
324 pub type InboundXcmpSuspended<T: Config> =
325 StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
326
327 #[pallet::storage]
334 pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
335 _,
336 BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
337 ValueQuery,
338 >;
339
340 #[pallet::storage]
342 pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
343 _,
344 Blake2_128Concat,
345 ParaId,
346 Twox64Concat,
347 u16,
348 WeakBoundedVec<u8, T::MaxPageSize>,
349 ValueQuery,
350 >;
351
352 #[pallet::storage]
354 pub(super) type SignalMessages<T: Config> =
355 StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
356
357 #[pallet::storage]
359 pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
360
361 #[pallet::storage]
363 pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
364
365 #[pallet::storage]
367 pub(super) type DeliveryFeeFactor<T: Config> =
368 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
369}
370
371#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
372pub enum OutboundState {
373 Ok,
374 Suspended,
375}
376
377#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
379pub struct OutboundChannelDetails {
380 recipient: ParaId,
382 state: OutboundState,
384 signals_exist: bool,
386 first_index: u16,
388 last_index: u16,
390}
391
392impl OutboundChannelDetails {
393 pub fn new(recipient: ParaId) -> OutboundChannelDetails {
394 OutboundChannelDetails {
395 recipient,
396 state: OutboundState::Ok,
397 signals_exist: false,
398 first_index: 0,
399 last_index: 0,
400 }
401 }
402
403 pub fn with_signals(mut self) -> OutboundChannelDetails {
404 self.signals_exist = true;
405 self
406 }
407
408 pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
409 self.state = OutboundState::Suspended;
410 self
411 }
412}
413
414#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
415pub struct QueueConfigData {
416 suspend_threshold: u32,
419 drop_threshold: u32,
423 resume_threshold: u32,
426}
427
428impl Default for QueueConfigData {
429 fn default() -> Self {
430 Self {
433 drop_threshold: 48, suspend_threshold: 32, resume_threshold: 8, }
437 }
438}
439
440impl QueueConfigData {
441 pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
445 if self.resume_threshold < self.suspend_threshold &&
446 self.suspend_threshold <= self.drop_threshold &&
447 self.resume_threshold > 0
448 {
449 Ok(())
450 } else {
451 Err(Error::<T>::BadQueueConfig.into())
452 }
453 }
454}
455
456#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
457pub enum ChannelSignal {
458 Suspend,
459 Resume,
460}
461
462impl<T: Config> Pallet<T> {
463 fn send_fragment<Fragment: Encode>(
487 recipient: ParaId,
488 format: XcmpMessageFormat,
489 fragment: Fragment,
490 ) -> Result<u32, MessageSendError> {
491 let encoded_fragment = fragment.encode();
492
493 let channel_info =
497 T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
498 let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
500 let format_size = format.encoded_size();
501 let size_to_check = encoded_fragment
504 .len()
505 .checked_add(format_size)
506 .ok_or(MessageSendError::TooBig)?;
507 if size_to_check > max_message_size {
508 return Err(MessageSendError::TooBig)
509 }
510
511 let mut all_channels = <OutboundXcmpStatus<T>>::get();
512 let channel_details = if let Some(details) =
513 all_channels.iter_mut().find(|channel| channel.recipient == recipient)
514 {
515 details
516 } else {
517 all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
518 log::error!("Failed to activate HRMP channel: {:?}", e);
519 MessageSendError::TooManyChannels
520 })?;
521 all_channels
522 .last_mut()
523 .expect("can't be empty; a new element was just pushed; qed")
524 };
525 let have_active = channel_details.last_index > channel_details.first_index;
526 let appended_to_last_page = have_active
529 .then(|| {
530 <OutboundXcmpMessages<T>>::try_mutate(
531 recipient,
532 channel_details.last_index - 1,
533 |page| {
534 if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
535 defensive!("Bad format in outbound queue; dropping message");
536 return Err(())
537 }
538 if page.len() + encoded_fragment.len() > max_message_size {
539 return Err(())
540 }
541 for frag in encoded_fragment.iter() {
542 page.try_push(*frag)?;
543 }
544 Ok(page.len())
545 },
546 )
547 .ok()
548 })
549 .flatten();
550
551 let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
552 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
553 (number_of_pages, size)
554 } else {
555 let page_index = channel_details.last_index;
557 channel_details.last_index += 1;
558 let mut new_page = format.encode();
559 new_page.extend_from_slice(&encoded_fragment[..]);
560 let last_page_size = new_page.len();
561 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
562 let bounded_page =
563 BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
564 log::debug!(target: LOG_TARGET, "Failed to create bounded message page: {error:?}");
565 MessageSendError::TooBig
566 })?;
567 let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
568 <OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
569 <OutboundXcmpStatus<T>>::put(all_channels);
570 (number_of_pages, last_page_size)
571 };
572
573 let total_size =
577 number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
578 let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
579 if total_size > threshold {
580 Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
581 }
582
583 Ok(number_of_pages)
584 }
585
586 fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
589 let mut s = <OutboundXcmpStatus<T>>::get();
590 if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
591 details.signals_exist = true;
592 } else {
593 s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
594 log::debug!(target: LOG_TARGET, "Failed to activate XCMP channel: {error:?}");
595 Error::<T>::TooManyActiveOutboundChannels
596 })?;
597 }
598
599 let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
600 (XcmpMessageFormat::Signals, signal).encode(),
601 )
602 .map_err(|error| {
603 log::debug!(target: LOG_TARGET, "Failed to encode signal message: {error:?}");
604 Error::<T>::TooBig
605 })?;
606 let page = WeakBoundedVec::force_from(page.into_inner(), None);
607
608 <SignalMessages<T>>::insert(dest, page);
609 <OutboundXcmpStatus<T>>::put(s);
610 Ok(())
611 }
612
613 fn suspend_channel(target: ParaId) {
614 <OutboundXcmpStatus<T>>::mutate(|s| {
615 if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
616 let ok = details.state == OutboundState::Ok;
617 defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
618 details.state = OutboundState::Suspended;
619 } else {
620 if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
621 defensive!("Cannot pause channel; too many outbound channels");
622 }
623 }
624 });
625 }
626
627 fn resume_channel(target: ParaId) {
628 <OutboundXcmpStatus<T>>::mutate(|s| {
629 if let Some(index) = s.iter().position(|item| item.recipient == target) {
630 let suspended = s[index].state == OutboundState::Suspended;
631 defensive_assert!(
632 suspended,
633 "WARNING: Attempt to resume channel that was not suspended."
634 );
635 if s[index].first_index == s[index].last_index {
636 s.remove(index);
637 } else {
638 s[index].state = OutboundState::Ok;
639 }
640 } else {
641 defensive!("WARNING: Attempt to resume channel that was not suspended.");
642 }
643 });
644 }
645
646 fn enqueue_xcmp_messages(
647 sender: ParaId,
648 xcms: &[BoundedVec<u8, MaxXcmpMessageLenOf<T>>],
649 meter: &mut WeightMeter,
650 ) -> Result<(), ()> {
651 let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
652 let batches_footprints = T::XcmpQueue::get_batches_footprints(
653 sender,
654 xcms.iter().map(|xcm| xcm.as_bounded_slice()),
655 drop_threshold,
656 );
657
658 let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
659 let required_weight = T::WeightInfo::enqueue_xcmp_messages(
660 batches_footprints.first_page_pos.saturated_into(),
661 batch_info,
662 );
663
664 match meter.can_consume(required_weight) {
665 true => core::cmp::Ordering::Less,
666 false => core::cmp::Ordering::Greater,
667 }
668 });
669
670 meter.consume(T::WeightInfo::enqueue_xcmp_messages(
671 batches_footprints.first_page_pos.saturated_into(),
672 best_batch_footprint,
673 ));
674 T::XcmpQueue::enqueue_messages(
675 xcms.iter()
676 .take(best_batch_footprint.msgs_count)
677 .map(|xcm| xcm.as_bounded_slice()),
678 sender,
679 );
680
681 if best_batch_footprint.msgs_count < xcms.len() {
682 log::error!(
683 "Out of weight: cannot enqueue entire XCMP messages batch; \
684 dropped some or all messages in batch. Used weight: {:?}",
685 meter.consumed_ratio()
686 );
687 return Err(());
688 }
689 Ok(())
690 }
691
692 pub(crate) fn take_first_concatenated_xcm(
700 data: &mut &[u8],
701 meter: &mut WeightMeter,
702 ) -> Result<Option<BoundedVec<u8, MaxXcmpMessageLenOf<T>>>, ()> {
703 if data.is_empty() {
704 return Ok(None)
705 }
706
707 if meter.try_consume(T::WeightInfo::take_first_concatenated_xcm()).is_err() {
708 defensive!("Out of weight; could not decode all; dropping");
709 return Err(())
710 }
711
712 let xcm = VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, data).map_err(
713 |error| {
714 log::debug!(target: LOG_TARGET, "Failed to decode XCM with depth limit: {error:?}");
715 ()
716 },
717 )?;
718 Ok(Some(xcm.encode().try_into().map_err(|error| {
719 log::debug!(target: LOG_TARGET, "Failed to encode XCM after decoding: {error:?}");
720 ()
721 })?))
722 }
723
724 pub(crate) fn take_first_concatenated_xcms(
729 data: &mut &[u8],
730 batch_size: usize,
731 meter: &mut WeightMeter,
732 ) -> Result<
733 Vec<BoundedVec<u8, MaxXcmpMessageLenOf<T>>>,
734 Vec<BoundedVec<u8, MaxXcmpMessageLenOf<T>>>,
735 > {
736 let mut batch = vec![];
737 loop {
738 match Self::take_first_concatenated_xcm(data, meter) {
739 Ok(Some(xcm)) => {
740 batch.push(xcm);
741 if batch.len() >= batch_size {
742 return Ok(batch);
743 }
744 },
745 Ok(None) => return Ok(batch),
746 Err(_) => return Err(batch),
747 }
748 }
749 }
750
751 pub fn on_idle_weight() -> Weight {
753 <T as crate::Config>::WeightInfo::on_idle_good_msg()
754 .max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
755 }
756
757 #[cfg(feature = "bridging")]
758 fn is_inbound_channel_suspended(sender: ParaId) -> bool {
759 <InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
760 }
761
762 #[cfg(feature = "bridging")]
763 fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
765 <OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
766 let queued_pages = c.last_index.saturating_sub(c.first_index);
767 (c.state, queued_pages)
768 })
769 }
770}
771
772impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
773 fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
775 let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
776
777 let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
778 let suspended = suspended_channels.contains(¶);
779
780 if suspended && fp.ready_pages <= resume_threshold {
781 if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
782 log::error!("defensive: Could not send resumption signal to inbound channel of sibling {:?}: {:?}; channel remains suspended.", para, err);
783 } else {
784 suspended_channels.remove(¶);
785 <InboundXcmpSuspended<T>>::put(suspended_channels);
786 }
787 } else if !suspended && fp.ready_pages >= suspend_threshold {
788 log::warn!("XCMP queue for sibling {:?} is full; suspending channel.", para);
789
790 if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
791 log::error!(
793 "defensive: Could not send suspension signal; future messages may be dropped: {:?}", err
794 );
795 } else if let Err(err) = suspended_channels.try_insert(para) {
796 log::error!("Too many channels suspended; cannot suspend sibling {:?}: {:?}; further messages may be dropped.", para, err);
797 } else {
798 <InboundXcmpSuspended<T>>::put(suspended_channels);
799 }
800 }
801 }
802}
803
804impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
805 fn is_paused(para: &ParaId) -> bool {
806 if !QueueSuspended::<T>::get() {
807 return false
808 }
809
810 let sender_origin = T::ControllerOriginConverter::convert_origin(
812 (Parent, Parachain((*para).into())),
813 OriginKind::Superuser,
814 );
815 let is_controller =
816 sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
817
818 !is_controller
819 }
820}
821
822impl<T: Config> XcmpMessageHandler for Pallet<T> {
823 fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
824 iter: I,
825 max_weight: Weight,
826 ) -> Weight {
827 let mut meter = WeightMeter::with_limit(max_weight);
828
829 let mut known_xcm_senders = BTreeSet::new();
830 for (sender, _sent_at, mut data) in iter {
831 let format = match XcmpMessageFormat::decode(&mut data) {
832 Ok(f) => f,
833 Err(_) => {
834 defensive!("Unknown XCMP message format - dropping");
835 continue
836 },
837 };
838
839 match format {
840 XcmpMessageFormat::Signals => {
841 let mut signal_count = 0;
842 while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
843 signal_count += 1;
844 match ChannelSignal::decode(&mut data) {
845 Ok(ChannelSignal::Suspend) => {
846 if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
847 defensive!(
848 "Not enough weight to process suspend signal - dropping"
849 );
850 break
851 }
852 Self::suspend_channel(sender)
853 },
854 Ok(ChannelSignal::Resume) => {
855 if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
856 defensive!(
857 "Not enough weight to process resume signal - dropping"
858 );
859 break
860 }
861 Self::resume_channel(sender)
862 },
863 Err(_) => {
864 defensive!("Undecodable channel signal - dropping");
865 break
866 },
867 }
868 }
869 },
870 XcmpMessageFormat::ConcatenatedVersionedXcm => {
871 if known_xcm_senders.insert(sender) {
872 if meter
873 .try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
874 .is_err()
875 {
876 defensive!(
877 "Out of weight: cannot enqueue XCMP messages; dropping page; \
878 Used weight: ",
879 meter.consumed_ratio()
880 );
881 continue;
882 }
883 }
884
885 let mut can_process_next_batch = true;
886 while can_process_next_batch {
887 let batch = match Self::take_first_concatenated_xcms(
888 &mut data,
889 XCM_BATCH_SIZE,
890 &mut meter,
891 ) {
892 Ok(batch) => batch,
893 Err(batch) => {
894 can_process_next_batch = false;
895 defensive!(
896 "HRMP inbound decode stream broke; page will be dropped."
897 );
898 batch
899 },
900 };
901 if batch.is_empty() {
902 break;
903 }
904
905 if let Err(()) = Self::enqueue_xcmp_messages(sender, &batch, &mut meter) {
906 break
907 }
908 }
909 },
910 XcmpMessageFormat::ConcatenatedEncodedBlob => {
911 defensive!("Blob messages are unhandled - dropping");
912 continue
913 },
914 }
915 }
916
917 meter.consumed()
918 }
919}
920
921impl<T: Config> XcmpMessageSource for Pallet<T> {
922 fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
923 let mut statuses = <OutboundXcmpStatus<T>>::get();
924 let old_statuses_len = statuses.len();
925 let max_message_count = statuses.len().min(maximum_channels);
926 let mut result = Vec::with_capacity(max_message_count);
927
928 for status in statuses.iter_mut() {
929 let OutboundChannelDetails {
930 recipient: para_id,
931 state: outbound_state,
932 mut signals_exist,
933 mut first_index,
934 mut last_index,
935 } = *status;
936
937 let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
938 ChannelStatus::Closed => {
939 for i in first_index..last_index {
942 <OutboundXcmpMessages<T>>::remove(para_id, i);
943 }
944 if signals_exist {
945 <SignalMessages<T>>::remove(para_id);
946 }
947 *status = OutboundChannelDetails::new(para_id);
948 continue
949 },
950 ChannelStatus::Full => continue,
951 ChannelStatus::Ready(n, e) => (n, e),
952 };
953
954 if result.len() == max_message_count {
956 break
959 }
960
961 let page = if signals_exist {
962 let page = <SignalMessages<T>>::get(para_id);
963 defensive_assert!(!page.is_empty(), "Signals must exist");
964
965 if page.len() < max_size_now {
966 <SignalMessages<T>>::remove(para_id);
967 signals_exist = false;
968 page
969 } else {
970 defensive!("Signals should fit into a single page");
971 continue
972 }
973 } else if outbound_state == OutboundState::Suspended {
974 continue
976 } else if last_index > first_index {
977 let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
978 if page.len() < max_size_now {
979 <OutboundXcmpMessages<T>>::remove(para_id, first_index);
980 first_index += 1;
981 page
982 } else {
983 continue
984 }
985 } else {
986 continue
987 };
988 if first_index == last_index {
989 first_index = 0;
990 last_index = 0;
991 }
992
993 if page.len() > max_size_ever {
994 defensive!("WARNING: oversize message in queue - dropping");
998 } else {
999 result.push((para_id, page.into_inner()));
1000 }
1001
1002 let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1003 Some(channel_info) => channel_info.max_total_size,
1004 None => {
1005 log::warn!("calling `get_channel_info` with no RelevantMessagingState?!");
1006 MAX_POSSIBLE_ALLOCATION },
1008 };
1009 let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1010 let remaining_total_size: usize = (first_index..last_index)
1011 .map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1012 .sum();
1013 if remaining_total_size <= threshold as usize {
1014 Self::decrease_fee_factor(para_id);
1015 }
1016
1017 *status = OutboundChannelDetails {
1018 recipient: para_id,
1019 state: outbound_state,
1020 signals_exist,
1021 first_index,
1022 last_index,
1023 };
1024 }
1025 debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1026
1027 result.sort_by_key(|m| m.0);
1030
1031 statuses.retain(|x| {
1040 x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1041 });
1042
1043 let pruned = old_statuses_len - statuses.len();
1045 let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1048 "Could not store HRMP channels config. Some HRMP channels may be broken.",
1049 );
1050
1051 <OutboundXcmpStatus<T>>::put(statuses);
1052
1053 result
1054 }
1055}
1056
1057impl<T: Config> SendXcm for Pallet<T> {
1059 type Ticket = (ParaId, VersionedXcm<()>);
1060
1061 fn validate(
1062 dest: &mut Option<Location>,
1063 msg: &mut Option<Xcm<()>>,
1064 ) -> SendResult<(ParaId, VersionedXcm<()>)> {
1065 let d = dest.take().ok_or(SendError::MissingArgument)?;
1066
1067 match d.unpack() {
1068 (1, [Parachain(id)]) => {
1070 let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1071 let id = ParaId::from(*id);
1072 let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1073 let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1074 .map_err(|()| SendError::DestinationUnsupported)?;
1075 versioned_xcm
1076 .check_is_decodable()
1077 .map_err(|()| SendError::ExceedsMaxMessageSize)?;
1078
1079 Ok(((id, versioned_xcm), price))
1080 },
1081 _ => {
1082 *dest = Some(d);
1085 Err(SendError::NotApplicable)
1086 },
1087 }
1088 }
1089
1090 fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1091 let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1092
1093 match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1094 Ok(_) => {
1095 Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1096 Ok(hash)
1097 },
1098 Err(e) => {
1099 log::error!(target: LOG_TARGET, "Deliver error: {e:?}");
1100 Err(SendError::Transport(e.into()))
1101 },
1102 }
1103 }
1104}
1105
1106impl<T: Config> InspectMessageQueues for Pallet<T> {
1107 fn clear_messages() {
1108 let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1110 OutboundXcmpStatus::<T>::mutate(|details_vec| {
1111 for details in details_vec {
1112 details.first_index = 0;
1113 details.last_index = 0;
1114 }
1115 });
1116 }
1117
1118 fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1119 use xcm::prelude::*;
1120
1121 OutboundXcmpMessages::<T>::iter()
1122 .map(|(para_id, _, messages)| {
1123 let mut data = &messages[..];
1124 let decoded_format = XcmpMessageFormat::decode(&mut data).unwrap();
1125 if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
1126 panic!("Unexpected format.")
1127 }
1128 let mut decoded_messages = Vec::new();
1129 while !data.is_empty() {
1130 let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1131 MAX_XCM_DECODE_DEPTH,
1132 &mut data,
1133 )
1134 .unwrap();
1135 decoded_messages.push(decoded_message);
1136 }
1137
1138 (
1139 VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1140 decoded_messages,
1141 )
1142 })
1143 .collect()
1144 }
1145}
1146
1147impl<T: Config> FeeTracker for Pallet<T> {
1148 type Id = ParaId;
1149
1150 fn get_fee_factor(id: Self::Id) -> FixedU128 {
1151 <DeliveryFeeFactor<T>>::get(id)
1152 }
1153
1154 fn set_fee_factor(id: Self::Id, val: FixedU128) {
1155 <DeliveryFeeFactor<T>>::set(id, val);
1156 }
1157}