1#![cfg_attr(not(feature = "std"), no_std)]
37
38pub mod migration;
39
40#[cfg(test)]
41mod mock;
42
43#[cfg(test)]
44mod tests;
45
46#[cfg(feature = "runtime-benchmarks")]
47mod benchmarking;
48#[cfg(feature = "bridging")]
49pub mod bridging;
50pub mod weights;
51pub mod weights_ext;
52
53pub use weights::WeightInfo;
54pub use weights_ext::WeightInfoExt;
55
56extern crate alloc;
57
58use alloc::{collections::BTreeSet, vec, vec::Vec};
59use bounded_collections::{BoundedBTreeSet, BoundedSlice, BoundedVec};
60use codec::{Decode, DecodeLimit, Encode, MaxEncodedLen};
61use cumulus_primitives_core::{
62 relay_chain::BlockNumber as RelayBlockNumber, ChannelStatus, GetChannelInfo, MessageSendError,
63 ParaId, XcmpMessageFormat, XcmpMessageHandler, XcmpMessageSource,
64};
65
66use frame_support::{
67 defensive, defensive_assert,
68 traits::{
69 Defensive, EnqueueMessage, EnsureOrigin, Get, QueueFootprint, QueueFootprintQuery,
70 QueuePausedQuery,
71 },
72 weights::{Weight, WeightMeter},
73};
74use pallet_message_queue::OnQueueChanged;
75use polkadot_runtime_common::xcm_sender::PriceForMessageDelivery;
76use polkadot_runtime_parachains::{FeeTracker, GetMinFeeFactor};
77use scale_info::TypeInfo;
78use sp_core::MAX_POSSIBLE_ALLOCATION;
79use sp_runtime::{FixedU128, RuntimeDebug, SaturatedConversion, WeakBoundedVec};
80use xcm::{latest::prelude::*, VersionedLocation, VersionedXcm, WrapVersion, MAX_XCM_DECODE_DEPTH};
81use xcm_builder::InspectMessageQueues;
82use xcm_executor::traits::ConvertOrigin;
83
84pub use pallet::*;
85
86pub type OverweightIndex = u64;
88pub type MaxXcmpMessageLenOf<T> =
90 <<T as Config>::XcmpQueue as EnqueueMessage<ParaId>>::MaxMessageLen;
91
92const LOG_TARGET: &str = "xcmp_queue";
93const DEFAULT_POV_SIZE: u64 = 64 * 1024; pub const XCM_BATCH_SIZE: usize = 250;
96pub const MAX_SIGNALS_PER_PAGE: usize = 3;
98
99pub mod delivery_fee_constants {
101 pub const THRESHOLD_FACTOR: u32 = 2;
103}
104
105#[frame_support::pallet]
106pub mod pallet {
107 use super::*;
108 use frame_support::{pallet_prelude::*, Twox64Concat};
109 use frame_system::pallet_prelude::*;
110
111 #[pallet::pallet]
112 #[pallet::storage_version(migration::STORAGE_VERSION)]
113 pub struct Pallet<T>(_);
114
115 #[pallet::config]
116 pub trait Config: frame_system::Config {
117 #[allow(deprecated)]
118 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
119
120 type ChannelInfo: GetChannelInfo;
122
123 type VersionWrapper: WrapVersion;
125
126 type XcmpQueue: EnqueueMessage<ParaId>
131 + QueueFootprintQuery<ParaId, MaxMessageLen = MaxXcmpMessageLenOf<Self>>;
132
133 #[pallet::constant]
139 type MaxInboundSuspended: Get<u32>;
140
141 #[pallet::constant]
150 type MaxActiveOutboundChannels: Get<u32>;
151
152 #[pallet::constant]
158 type MaxPageSize: Get<u32>;
159
160 type ControllerOrigin: EnsureOrigin<Self::RuntimeOrigin>;
162
163 type ControllerOriginConverter: ConvertOrigin<Self::RuntimeOrigin>;
166
167 type PriceForSiblingDelivery: PriceForMessageDelivery<Id = ParaId>;
169
170 type WeightInfo: WeightInfoExt;
172 }
173
174 #[pallet::call]
175 impl<T: Config> Pallet<T> {
176 #[pallet::call_index(1)]
180 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
181 pub fn suspend_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
182 T::ControllerOrigin::ensure_origin(origin)?;
183
184 QueueSuspended::<T>::try_mutate(|suspended| {
185 if *suspended {
186 Err(Error::<T>::AlreadySuspended.into())
187 } else {
188 *suspended = true;
189 Ok(())
190 }
191 })
192 }
193
194 #[pallet::call_index(2)]
200 #[pallet::weight((T::DbWeight::get().writes(1), DispatchClass::Operational,))]
201 pub fn resume_xcm_execution(origin: OriginFor<T>) -> DispatchResult {
202 T::ControllerOrigin::ensure_origin(origin)?;
203
204 QueueSuspended::<T>::try_mutate(|suspended| {
205 if !*suspended {
206 Err(Error::<T>::AlreadyResumed.into())
207 } else {
208 *suspended = false;
209 Ok(())
210 }
211 })
212 }
213
214 #[pallet::call_index(3)]
220 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
221 pub fn update_suspend_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
222 ensure_root(origin)?;
223
224 QueueConfig::<T>::try_mutate(|data| {
225 data.suspend_threshold = new;
226 data.validate::<T>()
227 })
228 }
229
230 #[pallet::call_index(4)]
236 #[pallet::weight((T::WeightInfo::set_config_with_u32(),DispatchClass::Operational,))]
237 pub fn update_drop_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
238 ensure_root(origin)?;
239
240 QueueConfig::<T>::try_mutate(|data| {
241 data.drop_threshold = new;
242 data.validate::<T>()
243 })
244 }
245
246 #[pallet::call_index(5)]
252 #[pallet::weight((T::WeightInfo::set_config_with_u32(), DispatchClass::Operational,))]
253 pub fn update_resume_threshold(origin: OriginFor<T>, new: u32) -> DispatchResult {
254 ensure_root(origin)?;
255
256 QueueConfig::<T>::try_mutate(|data| {
257 data.resume_threshold = new;
258 data.validate::<T>()
259 })
260 }
261 }
262
263 #[pallet::hooks]
264 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
265 fn integrity_test() {
266 assert!(!T::MaxPageSize::get().is_zero(), "MaxPageSize too low");
267
268 let w = Self::on_idle_weight();
269 assert!(w != Weight::zero());
270 assert!(w.all_lte(T::BlockWeights::get().max_block));
271
272 <T::WeightInfo as WeightInfoExt>::check_accuracy::<MaxXcmpMessageLenOf<T>>(0.15);
273 }
274
275 fn on_idle(_block: BlockNumberFor<T>, limit: Weight) -> Weight {
276 let mut meter = WeightMeter::with_limit(limit);
277
278 if meter.try_consume(Self::on_idle_weight()).is_err() {
279 tracing::debug!(
280 target: LOG_TARGET,
281 "Not enough weight for on_idle. {} < {}",
282 Self::on_idle_weight(), limit
283 );
284 return meter.consumed()
285 }
286
287 migration::v3::lazy_migrate_inbound_queue::<T>();
288
289 meter.consumed()
290 }
291 }
292
293 #[pallet::event]
294 #[pallet::generate_deposit(pub(super) fn deposit_event)]
295 pub enum Event<T: Config> {
296 XcmpMessageSent { message_hash: XcmHash },
298 }
299
300 #[pallet::error]
301 pub enum Error<T> {
302 BadQueueConfig,
304 AlreadySuspended,
306 AlreadyResumed,
308 TooManyActiveOutboundChannels,
310 TooBig,
312 }
313
314 #[pallet::storage]
323 pub type InboundXcmpSuspended<T: Config> =
324 StorageValue<_, BoundedBTreeSet<ParaId, T::MaxInboundSuspended>, ValueQuery>;
325
326 #[pallet::storage]
333 pub(super) type OutboundXcmpStatus<T: Config> = StorageValue<
334 _,
335 BoundedVec<OutboundChannelDetails, T::MaxActiveOutboundChannels>,
336 ValueQuery,
337 >;
338
339 #[pallet::storage]
341 pub(super) type OutboundXcmpMessages<T: Config> = StorageDoubleMap<
342 _,
343 Blake2_128Concat,
344 ParaId,
345 Twox64Concat,
346 u16,
347 WeakBoundedVec<u8, T::MaxPageSize>,
348 ValueQuery,
349 >;
350
351 #[pallet::storage]
353 pub(super) type SignalMessages<T: Config> =
354 StorageMap<_, Blake2_128Concat, ParaId, WeakBoundedVec<u8, T::MaxPageSize>, ValueQuery>;
355
356 #[pallet::storage]
358 pub(super) type QueueConfig<T: Config> = StorageValue<_, QueueConfigData, ValueQuery>;
359
360 #[pallet::storage]
362 pub(super) type QueueSuspended<T: Config> = StorageValue<_, bool, ValueQuery>;
363
364 #[pallet::storage]
366 pub(super) type DeliveryFeeFactor<T: Config> =
367 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pallet<T>>>;
368}
369
370#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
371pub enum OutboundState {
372 Ok,
373 Suspended,
374}
375
376#[derive(Clone, Eq, PartialEq, Encode, Decode, TypeInfo, RuntimeDebug, MaxEncodedLen)]
378pub struct OutboundChannelDetails {
379 recipient: ParaId,
381 state: OutboundState,
383 signals_exist: bool,
385 first_index: u16,
387 last_index: u16,
389}
390
391impl OutboundChannelDetails {
392 pub fn new(recipient: ParaId) -> OutboundChannelDetails {
393 OutboundChannelDetails {
394 recipient,
395 state: OutboundState::Ok,
396 signals_exist: false,
397 first_index: 0,
398 last_index: 0,
399 }
400 }
401
402 pub fn with_signals(mut self) -> OutboundChannelDetails {
403 self.signals_exist = true;
404 self
405 }
406
407 pub fn with_suspended_state(mut self) -> OutboundChannelDetails {
408 self.state = OutboundState::Suspended;
409 self
410 }
411}
412
413#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, RuntimeDebug, TypeInfo, MaxEncodedLen)]
414pub struct QueueConfigData {
415 suspend_threshold: u32,
418 drop_threshold: u32,
422 resume_threshold: u32,
425}
426
427impl Default for QueueConfigData {
428 fn default() -> Self {
429 Self {
432 drop_threshold: 48, suspend_threshold: 32, resume_threshold: 8, }
436 }
437}
438
439impl QueueConfigData {
440 pub fn validate<T: crate::Config>(&self) -> sp_runtime::DispatchResult {
444 if self.resume_threshold < self.suspend_threshold &&
445 self.suspend_threshold <= self.drop_threshold &&
446 self.resume_threshold > 0
447 {
448 Ok(())
449 } else {
450 Err(Error::<T>::BadQueueConfig.into())
451 }
452 }
453}
454
455#[derive(PartialEq, Eq, Copy, Clone, Encode, Decode, TypeInfo)]
456pub enum ChannelSignal {
457 Suspend,
458 Resume,
459}
460
461impl<T: Config> Pallet<T> {
462 fn send_fragment<Fragment: Encode>(
486 recipient: ParaId,
487 format: XcmpMessageFormat,
488 fragment: Fragment,
489 ) -> Result<u32, MessageSendError> {
490 let encoded_fragment = fragment.encode();
491
492 let channel_info =
496 T::ChannelInfo::get_channel_info(recipient).ok_or(MessageSendError::NoChannel)?;
497 let max_message_size = channel_info.max_message_size.min(T::MaxPageSize::get()) as usize;
499 let format_size = format.encoded_size();
500 let size_to_check = encoded_fragment
503 .len()
504 .checked_add(format_size)
505 .ok_or(MessageSendError::TooBig)?;
506 if size_to_check > max_message_size {
507 return Err(MessageSendError::TooBig)
508 }
509
510 let mut all_channels = <OutboundXcmpStatus<T>>::get();
511 let channel_details = if let Some(details) =
512 all_channels.iter_mut().find(|channel| channel.recipient == recipient)
513 {
514 details
515 } else {
516 all_channels.try_push(OutboundChannelDetails::new(recipient)).map_err(|e| {
517 tracing::error!(target: LOG_TARGET, error=?e, "Failed to activate HRMP channel");
518 MessageSendError::TooManyChannels
519 })?;
520 all_channels
521 .last_mut()
522 .expect("can't be empty; a new element was just pushed; qed")
523 };
524 let have_active = channel_details.last_index > channel_details.first_index;
525 let appended_to_last_page = have_active
528 .then(|| {
529 <OutboundXcmpMessages<T>>::try_mutate(
530 recipient,
531 channel_details.last_index - 1,
532 |page| {
533 if XcmpMessageFormat::decode(&mut &page[..]) != Ok(format) {
534 defensive!("Bad format in outbound queue; dropping message");
535 return Err(())
536 }
537 if page.len() + encoded_fragment.len() > max_message_size {
538 return Err(())
539 }
540 for frag in encoded_fragment.iter() {
541 page.try_push(*frag)?;
542 }
543 Ok(page.len())
544 },
545 )
546 .ok()
547 })
548 .flatten();
549
550 let (number_of_pages, last_page_size) = if let Some(size) = appended_to_last_page {
551 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
552 (number_of_pages, size)
553 } else {
554 let page_index = channel_details.last_index;
556 channel_details.last_index += 1;
557 let mut new_page = format.encode();
558 new_page.extend_from_slice(&encoded_fragment[..]);
559 let last_page_size = new_page.len();
560 let number_of_pages = (channel_details.last_index - channel_details.first_index) as u32;
561 let bounded_page =
562 BoundedVec::<u8, T::MaxPageSize>::try_from(new_page).map_err(|error| {
563 tracing::debug!(target: LOG_TARGET, ?error, "Failed to create bounded message page");
564 MessageSendError::TooBig
565 })?;
566 let bounded_page = WeakBoundedVec::force_from(bounded_page.into_inner(), None);
567 <OutboundXcmpMessages<T>>::insert(recipient, page_index, bounded_page);
568 <OutboundXcmpStatus<T>>::put(all_channels);
569 (number_of_pages, last_page_size)
570 };
571
572 let total_size =
576 number_of_pages.saturating_sub(1) * max_message_size as u32 + last_page_size as u32;
577 let threshold = channel_info.max_total_size / delivery_fee_constants::THRESHOLD_FACTOR;
578 if total_size > threshold {
579 Self::increase_fee_factor(recipient, encoded_fragment.len() as u128);
580 }
581
582 Ok(number_of_pages)
583 }
584
585 fn send_signal(dest: ParaId, signal: ChannelSignal) -> Result<(), Error<T>> {
588 let mut s = <OutboundXcmpStatus<T>>::get();
589 if let Some(details) = s.iter_mut().find(|item| item.recipient == dest) {
590 details.signals_exist = true;
591 } else {
592 s.try_push(OutboundChannelDetails::new(dest).with_signals()).map_err(|error| {
593 tracing::debug!(target: LOG_TARGET, ?error, "Failed to activate XCMP channel");
594 Error::<T>::TooManyActiveOutboundChannels
595 })?;
596 }
597
598 let page = BoundedVec::<u8, T::MaxPageSize>::try_from(
599 (XcmpMessageFormat::Signals, signal).encode(),
600 )
601 .map_err(|error| {
602 tracing::debug!(target: LOG_TARGET, ?error, "Failed to encode signal message");
603 Error::<T>::TooBig
604 })?;
605 let page = WeakBoundedVec::force_from(page.into_inner(), None);
606
607 <SignalMessages<T>>::insert(dest, page);
608 <OutboundXcmpStatus<T>>::put(s);
609 Ok(())
610 }
611
612 fn suspend_channel(target: ParaId) {
613 <OutboundXcmpStatus<T>>::mutate(|s| {
614 if let Some(details) = s.iter_mut().find(|item| item.recipient == target) {
615 let ok = details.state == OutboundState::Ok;
616 defensive_assert!(ok, "WARNING: Attempt to suspend channel that was not Ok.");
617 details.state = OutboundState::Suspended;
618 } else {
619 if s.try_push(OutboundChannelDetails::new(target).with_suspended_state()).is_err() {
620 defensive!("Cannot pause channel; too many outbound channels");
621 }
622 }
623 });
624 }
625
626 fn resume_channel(target: ParaId) {
627 <OutboundXcmpStatus<T>>::mutate(|s| {
628 if let Some(index) = s.iter().position(|item| item.recipient == target) {
629 let suspended = s[index].state == OutboundState::Suspended;
630 defensive_assert!(
631 suspended,
632 "WARNING: Attempt to resume channel that was not suspended."
633 );
634 if s[index].first_index == s[index].last_index {
635 s.remove(index);
636 } else {
637 s[index].state = OutboundState::Ok;
638 }
639 } else {
640 defensive!("WARNING: Attempt to resume channel that was not suspended.");
641 }
642 });
643 }
644
645 fn enqueue_xcmp_messages<'a>(
646 sender: ParaId,
647 xcms: &[BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>],
648 meter: &mut WeightMeter,
649 ) -> Result<(), ()> {
650 let QueueConfigData { drop_threshold, .. } = <QueueConfig<T>>::get();
651 let batches_footprints =
652 T::XcmpQueue::get_batches_footprints(sender, xcms.iter().copied(), drop_threshold);
653
654 let best_batch_footprint = batches_footprints.search_best_by(|batch_info| {
655 let required_weight = T::WeightInfo::enqueue_xcmp_messages(
656 batches_footprints.first_page_pos.saturated_into(),
657 batch_info,
658 );
659
660 match meter.can_consume(required_weight) {
661 true => core::cmp::Ordering::Less,
662 false => core::cmp::Ordering::Greater,
663 }
664 });
665
666 meter.consume(T::WeightInfo::enqueue_xcmp_messages(
667 batches_footprints.first_page_pos.saturated_into(),
668 best_batch_footprint,
669 ));
670 T::XcmpQueue::enqueue_messages(
671 xcms.iter().take(best_batch_footprint.msgs_count).copied(),
672 sender,
673 );
674
675 if best_batch_footprint.msgs_count < xcms.len() {
676 tracing::error!(
677 target: LOG_TARGET,
678 used_weight=?meter.consumed_ratio(),
679 "Out of weight: cannot enqueue entire XCMP messages batch; \
680 dropped some or all messages in batch."
681 );
682 return Err(());
683 }
684 Ok(())
685 }
686
687 pub(crate) fn take_first_concatenated_xcm<'a>(
695 data: &mut &'a [u8],
696 meter: &mut WeightMeter,
697 ) -> Result<Option<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>, ()> {
698 if data.is_empty() {
699 return Ok(None)
700 }
701
702 let base_weight = T::WeightInfo::take_first_concatenated_xcm(0);
704 if meter.try_consume(base_weight).is_err() {
705 defensive!("Out of weight; could not decode all; dropping");
706 return Err(())
707 }
708
709 let input_data = &mut &data[..];
710 let mut input = codec::CountedInput::new(input_data);
711 VersionedXcm::<()>::decode_with_depth_limit(MAX_XCM_DECODE_DEPTH, &mut input).map_err(
712 |error| {
713 tracing::debug!(target: LOG_TARGET, ?error, "Failed to decode XCM with depth limit");
714 ()
715 },
716 )?;
717 let (xcm_data, remaining_data) = data.split_at(input.count() as usize);
718 *data = remaining_data;
719
720 let extra_weight =
724 T::WeightInfo::take_first_concatenated_xcm(xcm_data.len() as u32) - base_weight;
725 meter.consume(extra_weight);
726
727 let xcm = Some(BoundedSlice::try_from(xcm_data).map_err(|error| {
728 tracing::error!(
729 target: LOG_TARGET,
730 ?error,
731 "Failed to take XCM after decoding: message is too long"
732 );
733 ()
734 })?);
735
736 Ok(xcm)
737 }
738
739 pub(crate) fn take_first_concatenated_xcms<'a>(
744 data: &mut &'a [u8],
745 batch_size: usize,
746 meter: &mut WeightMeter,
747 ) -> Result<
748 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
749 Vec<BoundedSlice<'a, u8, MaxXcmpMessageLenOf<T>>>,
750 > {
751 let mut batch = vec![];
752 loop {
753 match Self::take_first_concatenated_xcm(data, meter) {
754 Ok(Some(xcm)) => {
755 batch.push(xcm);
756 if batch.len() >= batch_size {
757 return Ok(batch);
758 }
759 },
760 Ok(None) => return Ok(batch),
761 Err(_) => return Err(batch),
762 }
763 }
764 }
765
766 pub fn on_idle_weight() -> Weight {
768 <T as crate::Config>::WeightInfo::on_idle_good_msg()
769 .max(<T as crate::Config>::WeightInfo::on_idle_large_msg())
770 }
771
772 #[cfg(feature = "bridging")]
773 fn is_inbound_channel_suspended(sender: ParaId) -> bool {
774 <InboundXcmpSuspended<T>>::get().iter().any(|c| c == &sender)
775 }
776
777 #[cfg(feature = "bridging")]
778 fn outbound_channel_state(target: ParaId) -> Option<(OutboundState, u16)> {
780 <OutboundXcmpStatus<T>>::get().iter().find(|c| c.recipient == target).map(|c| {
781 let queued_pages = c.last_index.saturating_sub(c.first_index);
782 (c.state, queued_pages)
783 })
784 }
785}
786
787impl<T: Config> OnQueueChanged<ParaId> for Pallet<T> {
788 fn on_queue_changed(para: ParaId, fp: QueueFootprint) {
790 let QueueConfigData { resume_threshold, suspend_threshold, .. } = <QueueConfig<T>>::get();
791
792 let mut suspended_channels = <InboundXcmpSuspended<T>>::get();
793 let suspended = suspended_channels.contains(¶);
794
795 if suspended && fp.ready_pages <= resume_threshold {
796 if let Err(err) = Self::send_signal(para, ChannelSignal::Resume) {
797 tracing::error!(
798 target: LOG_TARGET,
799 error=?err,
800 sibling=?para,
801 "defensive: Could not send resumption signal to inbound channel of sibling; channel remains suspended."
802 );
803 } else {
804 suspended_channels.remove(¶);
805 <InboundXcmpSuspended<T>>::put(suspended_channels);
806 }
807 } else if !suspended && fp.ready_pages >= suspend_threshold {
808 tracing::warn!(target: LOG_TARGET, sibling=?para, "XCMP queue for sibling is full; suspending channel.");
809
810 if let Err(err) = Self::send_signal(para, ChannelSignal::Suspend) {
811 tracing::error!(
813 target: LOG_TARGET, error=?err,
814 "defensive: Could not send suspension signal; future messages may be dropped."
815 );
816 } else if let Err(err) = suspended_channels.try_insert(para) {
817 tracing::error!(
818 target: LOG_TARGET,
819 error=?err,
820 sibling=?para,
821 "Too many channels suspended; cannot suspend sibling; further messages may be dropped."
822 );
823 } else {
824 <InboundXcmpSuspended<T>>::put(suspended_channels);
825 }
826 }
827 }
828}
829
830impl<T: Config> QueuePausedQuery<ParaId> for Pallet<T> {
831 fn is_paused(para: &ParaId) -> bool {
832 if !QueueSuspended::<T>::get() {
833 return false
834 }
835
836 let sender_origin = T::ControllerOriginConverter::convert_origin(
838 (Parent, Parachain((*para).into())),
839 OriginKind::Superuser,
840 );
841 let is_controller =
842 sender_origin.map_or(false, |origin| T::ControllerOrigin::try_origin(origin).is_ok());
843
844 !is_controller
845 }
846}
847
848impl<T: Config> XcmpMessageHandler for Pallet<T> {
849 fn handle_xcmp_messages<'a, I: Iterator<Item = (ParaId, RelayBlockNumber, &'a [u8])>>(
850 iter: I,
851 max_weight: Weight,
852 ) -> Weight {
853 let mut meter = WeightMeter::with_limit(max_weight);
854
855 let mut known_xcm_senders = BTreeSet::new();
856 for (sender, _sent_at, mut data) in iter {
857 let format = match XcmpMessageFormat::decode(&mut data) {
858 Ok(f) => f,
859 Err(_) => {
860 defensive!("Unknown XCMP message format - dropping");
861 continue
862 },
863 };
864
865 match format {
866 XcmpMessageFormat::Signals => {
867 let mut signal_count = 0;
868 while !data.is_empty() && signal_count < MAX_SIGNALS_PER_PAGE {
869 signal_count += 1;
870 match ChannelSignal::decode(&mut data) {
871 Ok(ChannelSignal::Suspend) => {
872 if meter.try_consume(T::WeightInfo::suspend_channel()).is_err() {
873 defensive!(
874 "Not enough weight to process suspend signal - dropping"
875 );
876 break
877 }
878 Self::suspend_channel(sender)
879 },
880 Ok(ChannelSignal::Resume) => {
881 if meter.try_consume(T::WeightInfo::resume_channel()).is_err() {
882 defensive!(
883 "Not enough weight to process resume signal - dropping"
884 );
885 break
886 }
887 Self::resume_channel(sender)
888 },
889 Err(_) => {
890 defensive!("Undecodable channel signal - dropping");
891 break
892 },
893 }
894 }
895 },
896 XcmpMessageFormat::ConcatenatedVersionedXcm => {
897 if known_xcm_senders.insert(sender) {
898 if meter
899 .try_consume(T::WeightInfo::uncached_enqueue_xcmp_messages())
900 .is_err()
901 {
902 defensive!(
903 "Out of weight: cannot enqueue XCMP messages; dropping page; \
904 Used weight: ",
905 meter.consumed_ratio()
906 );
907 continue;
908 }
909 }
910
911 let mut can_process_next_batch = true;
912 while can_process_next_batch {
913 let batch = match Self::take_first_concatenated_xcms(
914 &mut data,
915 XCM_BATCH_SIZE,
916 &mut meter,
917 ) {
918 Ok(batch) => batch,
919 Err(batch) => {
920 can_process_next_batch = false;
921 defensive!(
922 "HRMP inbound decode stream broke; page will be dropped."
923 );
924 batch
925 },
926 };
927 if batch.is_empty() {
928 break;
929 }
930
931 if let Err(()) = Self::enqueue_xcmp_messages(sender, &batch, &mut meter) {
932 break
933 }
934 }
935 },
936 XcmpMessageFormat::ConcatenatedEncodedBlob => {
937 defensive!("Blob messages are unhandled - dropping");
938 continue
939 },
940 }
941 }
942
943 meter.consumed()
944 }
945}
946
947impl<T: Config> XcmpMessageSource for Pallet<T> {
948 fn take_outbound_messages(maximum_channels: usize) -> Vec<(ParaId, Vec<u8>)> {
949 let mut statuses = <OutboundXcmpStatus<T>>::get();
950 let old_statuses_len = statuses.len();
951 let max_message_count = statuses.len().min(maximum_channels);
952 let mut result = Vec::with_capacity(max_message_count);
953
954 for status in statuses.iter_mut() {
955 let OutboundChannelDetails {
956 recipient: para_id,
957 state: outbound_state,
958 mut signals_exist,
959 mut first_index,
960 mut last_index,
961 } = *status;
962
963 let (max_size_now, max_size_ever) = match T::ChannelInfo::get_channel_status(para_id) {
964 ChannelStatus::Closed => {
965 for i in first_index..last_index {
968 <OutboundXcmpMessages<T>>::remove(para_id, i);
969 }
970 if signals_exist {
971 <SignalMessages<T>>::remove(para_id);
972 }
973 *status = OutboundChannelDetails::new(para_id);
974 continue
975 },
976 ChannelStatus::Full => continue,
977 ChannelStatus::Ready(n, e) => (n, e),
978 };
979
980 if result.len() == max_message_count {
982 break
985 }
986
987 let page = if signals_exist {
988 let page = <SignalMessages<T>>::get(para_id);
989 defensive_assert!(!page.is_empty(), "Signals must exist");
990
991 if page.len() < max_size_now {
992 <SignalMessages<T>>::remove(para_id);
993 signals_exist = false;
994 page
995 } else {
996 defensive!("Signals should fit into a single page");
997 continue
998 }
999 } else if outbound_state == OutboundState::Suspended {
1000 continue
1002 } else if last_index > first_index {
1003 let page = <OutboundXcmpMessages<T>>::get(para_id, first_index);
1004 if page.len() < max_size_now {
1005 <OutboundXcmpMessages<T>>::remove(para_id, first_index);
1006 first_index += 1;
1007 page
1008 } else {
1009 continue
1010 }
1011 } else {
1012 continue
1013 };
1014 if first_index == last_index {
1015 first_index = 0;
1016 last_index = 0;
1017 }
1018
1019 if page.len() > max_size_ever {
1020 defensive!("WARNING: oversize message in queue - dropping");
1024 } else {
1025 result.push((para_id, page.into_inner()));
1026 }
1027
1028 let max_total_size = match T::ChannelInfo::get_channel_info(para_id) {
1029 Some(channel_info) => channel_info.max_total_size,
1030 None => {
1031 tracing::warn!(target: LOG_TARGET, "calling `get_channel_info` with no RelevantMessagingState?!");
1032 MAX_POSSIBLE_ALLOCATION },
1034 };
1035 let threshold = max_total_size.saturating_div(delivery_fee_constants::THRESHOLD_FACTOR);
1036 let remaining_total_size: usize = (first_index..last_index)
1037 .map(|index| OutboundXcmpMessages::<T>::decode_len(para_id, index).unwrap())
1038 .sum();
1039 if remaining_total_size <= threshold as usize {
1040 Self::decrease_fee_factor(para_id);
1041 }
1042
1043 *status = OutboundChannelDetails {
1044 recipient: para_id,
1045 state: outbound_state,
1046 signals_exist,
1047 first_index,
1048 last_index,
1049 };
1050 }
1051 debug_assert!(!statuses.iter().any(|s| s.signals_exist), "Signals should be handled");
1052
1053 result.sort_by_key(|m| m.0);
1056
1057 statuses.retain(|x| {
1066 x.state == OutboundState::Suspended || x.signals_exist || x.first_index < x.last_index
1067 });
1068
1069 let pruned = old_statuses_len - statuses.len();
1071 let _ = statuses.try_rotate_left(result.len().saturating_sub(pruned)).defensive_proof(
1074 "Could not store HRMP channels config. Some HRMP channels may be broken.",
1075 );
1076
1077 <OutboundXcmpStatus<T>>::put(statuses);
1078
1079 result
1080 }
1081}
1082
1083impl<T: Config> SendXcm for Pallet<T> {
1085 type Ticket = (ParaId, VersionedXcm<()>);
1086
1087 fn validate(
1088 dest: &mut Option<Location>,
1089 msg: &mut Option<Xcm<()>>,
1090 ) -> SendResult<(ParaId, VersionedXcm<()>)> {
1091 let d = dest.take().ok_or(SendError::MissingArgument)?;
1092
1093 match d.unpack() {
1094 (1, [Parachain(id)]) => {
1096 let xcm = msg.take().ok_or(SendError::MissingArgument)?;
1097 let id = ParaId::from(*id);
1098 let price = T::PriceForSiblingDelivery::price_for_delivery(id, &xcm);
1099 let versioned_xcm = T::VersionWrapper::wrap_version(&d, xcm)
1100 .map_err(|()| SendError::DestinationUnsupported)?;
1101 versioned_xcm
1102 .check_is_decodable()
1103 .map_err(|()| SendError::ExceedsMaxMessageSize)?;
1104
1105 Ok(((id, versioned_xcm), price))
1106 },
1107 _ => {
1108 *dest = Some(d);
1111 Err(SendError::NotApplicable)
1112 },
1113 }
1114 }
1115
1116 fn deliver((id, xcm): (ParaId, VersionedXcm<()>)) -> Result<XcmHash, SendError> {
1117 let hash = xcm.using_encoded(sp_io::hashing::blake2_256);
1118
1119 match Self::send_fragment(id, XcmpMessageFormat::ConcatenatedVersionedXcm, xcm) {
1120 Ok(_) => {
1121 Self::deposit_event(Event::XcmpMessageSent { message_hash: hash });
1122 Ok(hash)
1123 },
1124 Err(e) => {
1125 tracing::error!(target: LOG_TARGET, error=?e, "Deliver error");
1126 Err(SendError::Transport(e.into()))
1127 },
1128 }
1129 }
1130}
1131
1132impl<T: Config> InspectMessageQueues for Pallet<T> {
1133 fn clear_messages() {
1134 let _ = OutboundXcmpMessages::<T>::clear(u32::MAX, None);
1136 OutboundXcmpStatus::<T>::mutate(|details_vec| {
1137 for details in details_vec {
1138 details.first_index = 0;
1139 details.last_index = 0;
1140 }
1141 });
1142 }
1143
1144 fn get_messages() -> Vec<(VersionedLocation, Vec<VersionedXcm<()>>)> {
1145 use xcm::prelude::*;
1146
1147 OutboundXcmpMessages::<T>::iter()
1148 .map(|(para_id, _, messages)| {
1149 let mut data = &messages[..];
1150 let decoded_format = XcmpMessageFormat::decode(&mut data).unwrap();
1151 if decoded_format != XcmpMessageFormat::ConcatenatedVersionedXcm {
1152 panic!("Unexpected format.")
1153 }
1154 let mut decoded_messages = Vec::new();
1155 while !data.is_empty() {
1156 let decoded_message = VersionedXcm::<()>::decode_with_depth_limit(
1157 MAX_XCM_DECODE_DEPTH,
1158 &mut data,
1159 )
1160 .unwrap();
1161 decoded_messages.push(decoded_message);
1162 }
1163
1164 (
1165 VersionedLocation::from(Location::new(1, Parachain(para_id.into()))),
1166 decoded_messages,
1167 )
1168 })
1169 .collect()
1170 }
1171}
1172
1173impl<T: Config> FeeTracker for Pallet<T> {
1174 type Id = ParaId;
1175
1176 fn get_fee_factor(id: Self::Id) -> FixedU128 {
1177 <DeliveryFeeFactor<T>>::get(id)
1178 }
1179
1180 fn set_fee_factor(id: Self::Id, val: FixedU128) {
1181 <DeliveryFeeFactor<T>>::set(id, val);
1182 }
1183}