1#![deny(missing_docs)]
197#![cfg_attr(not(feature = "std"), no_std)]
198
199mod benchmarking;
200mod integration_test;
201mod mock;
202pub mod mock_helpers;
203mod tests;
204pub mod weights;
205
206extern crate alloc;
207
208use alloc::{vec, vec::Vec};
209use codec::{Codec, Decode, Encode, MaxEncodedLen};
210use core::{fmt::Debug, ops::Deref};
211use frame_support::{
212 defensive,
213 pallet_prelude::*,
214 traits::{
215 Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
216 ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
217 QueuePausedQuery, ServiceQueues,
218 },
219 BoundedSlice, CloneNoBound, DefaultNoBound,
220};
221use frame_system::pallet_prelude::*;
222pub use pallet::*;
223use scale_info::TypeInfo;
224use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use sp_core::{defer, H256};
226use sp_runtime::{
227 traits::{One, Zero},
228 SaturatedConversion, Saturating, TransactionOutcome,
229};
230use sp_weights::WeightMeter;
231pub use weights::WeightInfo;
232
233type PageIndex = u32;
235
236#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
238pub struct ItemHeader<Size> {
239 payload_len: Size,
242 is_processed: bool,
244}
245
246#[derive(
248 CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
249)]
250#[scale_info(skip_type_params(HeapSize))]
251#[codec(mel_bound(Size: MaxEncodedLen))]
252pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
253 remaining: Size,
256 remaining_size: Size,
260 first_index: Size,
262 first: Size,
265 last: Size,
267 heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
269}
270
271impl<
272 Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
273 HeapSize: Get<Size>,
274 > Page<Size, HeapSize>
275{
276 fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
278 let payload_len = message.len();
279 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
280 let payload_len = payload_len.saturated_into();
281 let header = ItemHeader::<Size> { payload_len, is_processed: false };
282
283 let mut heap = Vec::with_capacity(data_len);
284 header.using_encoded(|h| heap.extend_from_slice(h));
285 heap.extend_from_slice(message.deref());
286
287 Page {
288 remaining: One::one(),
289 remaining_size: payload_len,
290 first_index: Zero::zero(),
291 first: Zero::zero(),
292 last: Zero::zero(),
293 heap: BoundedVec::defensive_truncate_from(heap),
294 }
295 }
296
297 fn try_append_message<T: Config>(
299 &mut self,
300 message: BoundedSlice<u8, MaxMessageLenOf<T>>,
301 ) -> Result<(), ()> {
302 let pos = self.heap.len();
303 let payload_len = message.len();
304 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
305 let payload_len = payload_len.saturated_into();
306 let header = ItemHeader::<Size> { payload_len, is_processed: false };
307 let heap_size: u32 = HeapSize::get().into();
308 if (heap_size as usize).saturating_sub(self.heap.len()) < data_len {
309 return Err(())
311 }
312
313 let mut heap = core::mem::take(&mut self.heap).into_inner();
314 header.using_encoded(|h| heap.extend_from_slice(h));
315 heap.extend_from_slice(message.deref());
316 self.heap = BoundedVec::defensive_truncate_from(heap);
317 self.last = pos.saturated_into();
318 self.remaining.saturating_inc();
319 self.remaining_size.saturating_accrue(payload_len);
320 Ok(())
321 }
322
323 fn peek_first(&self) -> Option<BoundedSlice<u8, IntoU32<HeapSize, Size>>> {
327 if self.first > self.last {
328 return None
329 }
330 let f = (self.first.into() as usize).min(self.heap.len());
331 let mut item_slice = &self.heap[f..];
332 if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
333 let payload_len = h.payload_len.into() as usize;
334 if payload_len <= item_slice.len() {
335 return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]))
338 }
339 }
340 defensive!("message-queue: heap corruption");
341 None
342 }
343
344 fn skip_first(&mut self, is_processed: bool) {
346 let f = (self.first.into() as usize).min(self.heap.len());
347 if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
348 if is_processed && !h.is_processed {
349 h.is_processed = true;
350 h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
351 self.remaining.saturating_dec();
352 self.remaining_size.saturating_reduce(h.payload_len);
353 }
354 self.first
355 .saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
356 self.first.saturating_accrue(h.payload_len);
357 self.first_index.saturating_inc();
358 }
359 }
360
361 fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
363 let mut pos = 0;
364 let mut item_slice = &self.heap[..];
365 let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
366 for _ in 0..index {
367 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
368 let item_len = h.payload_len.into() as usize;
369 if item_slice.len() < item_len {
370 return None
371 }
372 item_slice = &item_slice[item_len..];
373 pos.saturating_accrue(header_len.saturating_add(item_len));
374 }
375 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
376 if item_slice.len() < h.payload_len.into() as usize {
377 return None
378 }
379 item_slice = &item_slice[..h.payload_len.into() as usize];
380 Some((pos, h.is_processed, item_slice))
381 }
382
383 fn note_processed_at_pos(&mut self, pos: usize) {
388 if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
389 if !h.is_processed {
390 h.is_processed = true;
391 h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
392 self.remaining.saturating_dec();
393 self.remaining_size.saturating_reduce(h.payload_len);
394 }
395 }
396 }
397
398 fn is_complete(&self) -> bool {
400 self.remaining.is_zero()
401 }
402}
403
404#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
406pub struct Neighbours<MessageOrigin> {
407 prev: MessageOrigin,
409 next: MessageOrigin,
411}
412
413#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
419pub struct BookState<MessageOrigin> {
420 begin: PageIndex,
423 end: PageIndex,
425 count: PageIndex,
430 ready_neighbours: Option<Neighbours<MessageOrigin>>,
433 message_count: u64,
435 size: u64,
437}
438
439impl<MessageOrigin> Default for BookState<MessageOrigin> {
440 fn default() -> Self {
441 Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
442 }
443}
444
445impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
446 fn from(book: BookState<MessageOrigin>) -> Self {
447 QueueFootprint {
448 pages: book.count,
449 ready_pages: book.end.defensive_saturating_sub(book.begin),
450 storage: Footprint { count: book.message_count, size: book.size },
451 }
452 }
453}
454
455pub trait OnQueueChanged<Id> {
457 fn on_queue_changed(id: Id, fp: QueueFootprint);
459}
460
461impl<Id> OnQueueChanged<Id> for () {
462 fn on_queue_changed(_: Id, _: QueueFootprint) {}
463}
464
465#[frame_support::pallet]
466pub mod pallet {
467 use super::*;
468
469 #[pallet::pallet]
470 pub struct Pallet<T>(_);
471
472 #[pallet::config]
474 pub trait Config: frame_system::Config {
475 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
477
478 type WeightInfo: WeightInfo;
480
481 type MessageProcessor: ProcessMessage;
492
493 type Size: BaseArithmetic
495 + Unsigned
496 + Copy
497 + Into<u32>
498 + Member
499 + Encode
500 + Decode
501 + MaxEncodedLen
502 + TypeInfo
503 + Default;
504
505 type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
508
509 type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
515
516 #[pallet::constant]
522 type HeapSize: Get<Self::Size>;
523
524 #[pallet::constant]
528 type MaxStale: Get<u32>;
529
530 #[pallet::constant]
537 type ServiceWeight: Get<Option<Weight>>;
538
539 #[pallet::constant]
545 type IdleMaxServiceWeight: Get<Option<Weight>>;
546 }
547
548 #[pallet::event]
549 #[pallet::generate_deposit(pub(super) fn deposit_event)]
550 pub enum Event<T: Config> {
551 ProcessingFailed {
553 id: H256,
555 origin: MessageOriginOf<T>,
557 error: ProcessMessageError,
562 },
563 Processed {
565 id: H256,
567 origin: MessageOriginOf<T>,
569 weight_used: Weight,
571 success: bool,
578 },
579 OverweightEnqueued {
581 id: [u8; 32],
583 origin: MessageOriginOf<T>,
585 page_index: PageIndex,
587 message_index: T::Size,
589 },
590 PageReaped {
592 origin: MessageOriginOf<T>,
594 index: PageIndex,
596 },
597 }
598
599 #[pallet::error]
600 pub enum Error<T> {
601 NotReapable,
604 NoPage,
606 NoMessage,
608 AlreadyProcessed,
610 Queued,
612 InsufficientWeight,
614 TemporarilyUnprocessable,
619 QueuePaused,
623 RecursiveDisallowed,
625 }
626
627 #[pallet::storage]
629 pub(super) type BookStateFor<T: Config> =
630 StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
631
632 #[pallet::storage]
634 pub(super) type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
635
636 #[pallet::storage]
638 pub(super) type Pages<T: Config> = StorageDoubleMap<
639 _,
640 Twox64Concat,
641 MessageOriginOf<T>,
642 Twox64Concat,
643 PageIndex,
644 Page<T::Size, T::HeapSize>,
645 OptionQuery,
646 >;
647
648 #[pallet::hooks]
649 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
650 fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
651 if let Some(weight_limit) = T::ServiceWeight::get() {
652 Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
653 } else {
654 Weight::zero()
655 }
656 }
657
658 fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
659 if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
660 Self::service_queues_impl(
662 weight_limit.min(remaining_weight),
663 ServiceQueuesContext::OnIdle,
664 )
665 } else {
666 Weight::zero()
667 }
668 }
669
670 #[cfg(feature = "try-runtime")]
671 fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
672 Self::do_try_state()
673 }
674
675 #[cfg(test)]
677 fn integrity_test() {
678 Self::do_integrity_test().expect("Pallet config is valid; qed")
679 }
680 }
681
682 #[pallet::call]
683 impl<T: Config> Pallet<T> {
684 #[pallet::call_index(0)]
686 #[pallet::weight(T::WeightInfo::reap_page())]
687 pub fn reap_page(
688 origin: OriginFor<T>,
689 message_origin: MessageOriginOf<T>,
690 page_index: PageIndex,
691 ) -> DispatchResult {
692 let _ = ensure_signed(origin)?;
693 Self::do_reap_page(&message_origin, page_index)
694 }
695
696 #[pallet::call_index(1)]
710 #[pallet::weight(
711 T::WeightInfo::execute_overweight_page_updated().max(
712 T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
713 )]
714 pub fn execute_overweight(
715 origin: OriginFor<T>,
716 message_origin: MessageOriginOf<T>,
717 page: PageIndex,
718 index: T::Size,
719 weight_limit: Weight,
720 ) -> DispatchResultWithPostInfo {
721 let _ = ensure_signed(origin)?;
722 let actual_weight =
723 Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
724 Ok(Some(actual_weight).into())
725 }
726 }
727}
728
729#[derive(PartialEq, Debug)]
731enum PageExecutionStatus {
732 Bailed,
734 NoProgress,
738 NoMore,
744}
745
746#[derive(PartialEq, Debug)]
748enum ItemExecutionStatus {
749 Bailed,
751 NoProgress,
755 NoItem,
757 Executed(bool),
761}
762
763#[derive(PartialEq)]
765enum MessageExecutionStatus {
766 InsufficientWeight,
768 Overweight,
770 Processed,
772 Unprocessable { permanent: bool },
774 StackLimitReached,
781}
782
783#[derive(PartialEq)]
786enum ServiceQueuesContext {
787 OnIdle,
789 OnInitialize,
791 ServiceQueues,
793}
794
795impl<T: Config> Pallet<T> {
796 fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
800 if let Some(head) = ServiceHead::<T>::get() {
801 let mut head_book_state = BookStateFor::<T>::get(&head);
802 let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
803 let tail = head_neighbours.prev;
804 head_neighbours.prev = origin.clone();
805 head_book_state.ready_neighbours = Some(head_neighbours);
806 BookStateFor::<T>::insert(&head, head_book_state);
807
808 let mut tail_book_state = BookStateFor::<T>::get(&tail);
809 let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
810 tail_neighbours.next = origin.clone();
811 tail_book_state.ready_neighbours = Some(tail_neighbours);
812 BookStateFor::<T>::insert(&tail, tail_book_state);
813
814 Ok(Neighbours { next: head, prev: tail })
815 } else {
816 ServiceHead::<T>::put(origin);
817 Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
818 }
819 }
820
821 fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
822 if origin == &neighbours.next {
823 debug_assert!(
824 origin == &neighbours.prev,
825 "unknitting from single item ring; outgoing must be only item"
826 );
827 ServiceHead::<T>::kill();
829 } else {
830 BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
831 if let Some(ref mut n) = book_state.ready_neighbours {
832 n.prev = neighbours.prev.clone()
833 }
834 });
835 BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
836 if let Some(ref mut n) = book_state.ready_neighbours {
837 n.next = neighbours.next.clone()
838 }
839 });
840 if let Some(head) = ServiceHead::<T>::get() {
841 if &head == origin {
842 ServiceHead::<T>::put(neighbours.next);
843 }
844 } else {
845 defensive!("`ServiceHead` must be some if there was a ready queue");
846 }
847 }
848 }
849
850 fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
854 if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
855 return None
856 }
857
858 if let Some(head) = ServiceHead::<T>::get() {
859 let mut head_book_state = BookStateFor::<T>::get(&head);
860 if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
861 ServiceHead::<T>::put(&head_neighbours.next);
862 Some(head)
863 } else {
864 None
865 }
866 } else {
867 None
868 }
869 }
870
871 fn max_message_weight(limit: Weight) -> Option<Weight> {
877 let service_weight = T::ServiceWeight::get().unwrap_or_default();
878 let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
879
880 let max_message_weight =
883 if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
884
885 if max_message_weight.is_zero() {
886 limit.checked_sub(&Self::single_msg_overhead())
888 } else {
889 max_message_weight.checked_sub(&Self::single_msg_overhead())
890 }
891 }
892
893 fn single_msg_overhead() -> Weight {
895 T::WeightInfo::bump_service_head()
896 .saturating_add(T::WeightInfo::service_queue_base())
897 .saturating_add(
898 T::WeightInfo::service_page_base_completion()
899 .max(T::WeightInfo::service_page_base_no_completion()),
900 )
901 .saturating_add(T::WeightInfo::service_page_item())
902 .saturating_add(T::WeightInfo::ready_ring_unknit())
903 }
904
905 #[cfg(test)]
909 fn do_integrity_test() -> Result<(), String> {
910 ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
911
912 let max_block = T::BlockWeights::get().max_block;
913
914 if let Some(service) = T::ServiceWeight::get() {
915 if Self::max_message_weight(service).is_none() {
916 return Err(format!(
917 "ServiceWeight too low: {}. Must be at least {}",
918 service,
919 Self::single_msg_overhead(),
920 ))
921 }
922
923 if service.any_gt(max_block) {
924 return Err(format!(
925 "ServiceWeight {service} is bigger than max block weight {max_block}"
926 ))
927 }
928 }
929
930 if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
931 if on_idle.any_gt(max_block) {
932 return Err(format!(
933 "IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
934 ))
935 }
936 }
937
938 if let (Some(service_weight), Some(on_idle)) =
939 (T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
940 {
941 if !(service_weight.all_gt(on_idle) ||
942 on_idle.all_gt(service_weight) ||
943 service_weight == on_idle)
944 {
945 return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into())
946 }
947 }
948
949 Ok(())
950 }
951
952 fn do_enqueue_message(
953 origin: &MessageOriginOf<T>,
954 message: BoundedSlice<u8, MaxMessageLenOf<T>>,
955 ) {
956 let mut book_state = BookStateFor::<T>::get(origin);
957 book_state.message_count.saturating_inc();
958 book_state
959 .size
960 .saturating_accrue(message.len() as u64);
962
963 if book_state.end > book_state.begin {
964 debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
965 let last = book_state.end - 1;
967 let mut page = match Pages::<T>::get(origin, last) {
968 Some(p) => p,
969 None => {
970 defensive!("Corruption: referenced page doesn't exist.");
971 return
972 },
973 };
974 if page.try_append_message::<T>(message).is_ok() {
975 Pages::<T>::insert(origin, last, &page);
976 BookStateFor::<T>::insert(origin, book_state);
977 return
978 }
979 } else {
980 debug_assert!(
981 book_state.ready_neighbours.is_none(),
982 "Must not be in ready ring if not ready"
983 );
984 match Self::ready_ring_knit(origin) {
986 Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
987 Err(()) => {
988 defensive!("Ring state invalid when knitting");
989 },
990 }
991 }
992 book_state.end.saturating_inc();
994 book_state.count.saturating_inc();
995 let page = Page::from_message::<T>(message);
996 Pages::<T>::insert(origin, book_state.end - 1, page);
997 BookStateFor::<T>::insert(origin, book_state);
999 }
1000
1001 pub fn do_execute_overweight(
1006 origin: MessageOriginOf<T>,
1007 page_index: PageIndex,
1008 index: T::Size,
1009 weight_limit: Weight,
1010 ) -> Result<Weight, Error<T>> {
1011 match with_service_mutex(|| {
1012 Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1013 }) {
1014 Err(()) => Err(Error::<T>::RecursiveDisallowed),
1015 Ok(x) => x,
1016 }
1017 }
1018
1019 fn do_execute_overweight_inner(
1021 origin: MessageOriginOf<T>,
1022 page_index: PageIndex,
1023 index: T::Size,
1024 weight_limit: Weight,
1025 ) -> Result<Weight, Error<T>> {
1026 let mut book_state = BookStateFor::<T>::get(&origin);
1027 ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1028
1029 let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1030 let (pos, is_processed, payload) =
1031 page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1032 let payload_len = payload.len() as u64;
1033 ensure!(
1034 page_index < book_state.begin ||
1035 (page_index == book_state.begin && pos < page.first.into() as usize),
1036 Error::<T>::Queued
1037 );
1038 ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1039 use MessageExecutionStatus::*;
1040 let mut weight_counter = WeightMeter::with_limit(weight_limit);
1041 match Self::process_message_payload(
1042 origin.clone(),
1043 page_index,
1044 index,
1045 payload,
1046 &mut weight_counter,
1047 Weight::MAX,
1048 ) {
1051 Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1052 StackLimitReached | Unprocessable { permanent: false } =>
1053 Err(Error::<T>::TemporarilyUnprocessable),
1054 Unprocessable { permanent: true } | Processed => {
1055 page.note_processed_at_pos(pos);
1056 book_state.message_count.saturating_dec();
1057 book_state.size.saturating_reduce(payload_len);
1058 let page_weight = if page.remaining.is_zero() {
1059 debug_assert!(
1060 page.remaining_size.is_zero(),
1061 "no messages remaining; no space taken; qed"
1062 );
1063 Pages::<T>::remove(&origin, page_index);
1064 debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1065 book_state.count.saturating_dec();
1066 T::WeightInfo::execute_overweight_page_removed()
1067 } else {
1070 Pages::<T>::insert(&origin, page_index, page);
1071 T::WeightInfo::execute_overweight_page_updated()
1072 };
1073 BookStateFor::<T>::insert(&origin, &book_state);
1074 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1075 Ok(weight_counter.consumed().saturating_add(page_weight))
1076 },
1077 }
1078 }
1079
1080 fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1082 match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1083 Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1084 Ok(x) => x,
1085 }
1086 }
1087
1088 fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1090 let mut book_state = BookStateFor::<T>::get(origin);
1091 ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1094
1095 let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1096
1097 let reapable = page.remaining.is_zero();
1099
1100 let cullable = || {
1102 let total_pages = book_state.count;
1103 let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1104
1105 let stale_pages = total_pages - ready_pages;
1113
1114 let max_stale = T::MaxStale::get();
1118
1119 let overflow = match stale_pages.checked_sub(max_stale + 1) {
1122 Some(x) => x + 1,
1123 None => return false,
1124 };
1125
1126 let backlog = (max_stale * max_stale / overflow).max(max_stale);
1135
1136 let watermark = book_state.begin.saturating_sub(backlog);
1137 page_index < watermark
1138 };
1139 ensure!(reapable || cullable(), Error::<T>::NotReapable);
1140
1141 Pages::<T>::remove(origin, page_index);
1142 debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1143 book_state.count.saturating_dec();
1144 book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1145 book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1146 BookStateFor::<T>::insert(origin, &book_state);
1147 T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1148 Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1149
1150 Ok(())
1151 }
1152
1153 fn service_queue(
1157 origin: MessageOriginOf<T>,
1158 weight: &mut WeightMeter,
1159 overweight_limit: Weight,
1160 ) -> (bool, Option<MessageOriginOf<T>>) {
1161 use PageExecutionStatus::*;
1162 if weight
1163 .try_consume(
1164 T::WeightInfo::service_queue_base()
1165 .saturating_add(T::WeightInfo::ready_ring_unknit()),
1166 )
1167 .is_err()
1168 {
1169 return (false, None)
1170 }
1171
1172 let mut book_state = BookStateFor::<T>::get(&origin);
1173 let mut total_processed = 0;
1174 if T::QueuePausedQuery::is_paused(&origin) {
1175 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1176 return (false, next_ready)
1177 }
1178
1179 while book_state.end > book_state.begin {
1180 let (processed, status) =
1181 Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1182 total_processed.saturating_accrue(processed);
1183 match status {
1184 Bailed | NoProgress => break,
1186 NoMore => (),
1188 };
1189 book_state.begin.saturating_inc();
1190 }
1191 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1192 if book_state.begin >= book_state.end {
1193 if let Some(neighbours) = book_state.ready_neighbours.take() {
1195 Self::ready_ring_unknit(&origin, neighbours);
1196 } else if total_processed > 0 {
1197 defensive!("Freshly processed queue must have been ready");
1198 }
1199 }
1200 BookStateFor::<T>::insert(&origin, &book_state);
1201 if total_processed > 0 {
1202 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1203 }
1204 (total_processed > 0, next_ready)
1205 }
1206
1207 fn service_page(
1211 origin: &MessageOriginOf<T>,
1212 book_state: &mut BookStateOf<T>,
1213 weight: &mut WeightMeter,
1214 overweight_limit: Weight,
1215 ) -> (u32, PageExecutionStatus) {
1216 use PageExecutionStatus::*;
1217 if weight
1218 .try_consume(
1219 T::WeightInfo::service_page_base_completion()
1220 .max(T::WeightInfo::service_page_base_no_completion()),
1221 )
1222 .is_err()
1223 {
1224 return (0, Bailed)
1225 }
1226
1227 let page_index = book_state.begin;
1228 let mut page = match Pages::<T>::get(origin, page_index) {
1229 Some(p) => p,
1230 None => {
1231 defensive!("message-queue: referenced page not found");
1232 return (0, NoMore)
1233 },
1234 };
1235
1236 let mut total_processed = 0;
1237
1238 let status = loop {
1240 use ItemExecutionStatus::*;
1241 match Self::service_page_item(
1242 origin,
1243 page_index,
1244 book_state,
1245 &mut page,
1246 weight,
1247 overweight_limit,
1248 ) {
1249 Bailed => break PageExecutionStatus::Bailed,
1250 NoItem => break PageExecutionStatus::NoMore,
1251 NoProgress => break PageExecutionStatus::NoProgress,
1252 Executed(true) => total_processed.saturating_inc(),
1254 Executed(false) => (),
1255 }
1256 };
1257
1258 if page.is_complete() {
1259 debug_assert!(status != Bailed, "we never bail if a page became complete");
1260 Pages::<T>::remove(origin, page_index);
1261 debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1262 book_state.count.saturating_dec();
1263 } else {
1264 Pages::<T>::insert(origin, page_index, page);
1265 }
1266 (total_processed, status)
1267 }
1268
1269 pub(crate) fn service_page_item(
1271 origin: &MessageOriginOf<T>,
1272 page_index: PageIndex,
1273 book_state: &mut BookStateOf<T>,
1274 page: &mut PageOf<T>,
1275 weight: &mut WeightMeter,
1276 overweight_limit: Weight,
1277 ) -> ItemExecutionStatus {
1278 use MessageExecutionStatus::*;
1279 if page.is_complete() {
1282 return ItemExecutionStatus::NoItem
1283 }
1284 if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1285 return ItemExecutionStatus::Bailed
1286 }
1287
1288 let payload = &match page.peek_first() {
1289 Some(m) => m,
1290 None => return ItemExecutionStatus::NoItem,
1291 }[..];
1292 let payload_len = payload.len() as u64;
1293
1294 Pages::<T>::insert(origin, page_index, &*page);
1296 BookStateFor::<T>::insert(origin, &*book_state);
1297
1298 let res = Self::process_message_payload(
1299 origin.clone(),
1300 page_index,
1301 page.first_index,
1302 payload,
1303 weight,
1304 overweight_limit,
1305 );
1306
1307 *book_state = BookStateFor::<T>::get(origin);
1309 if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1310 *page = new_page;
1311 } else {
1312 defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1313 return ItemExecutionStatus::NoItem
1314 };
1315
1316 let is_processed = match res {
1317 InsufficientWeight => return ItemExecutionStatus::Bailed,
1318 Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1319 Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1320 Overweight => false,
1321 };
1322
1323 if is_processed {
1324 book_state.message_count.saturating_dec();
1325 book_state.size.saturating_reduce(payload_len as u64);
1326 }
1327 page.skip_first(is_processed);
1328 ItemExecutionStatus::Executed(is_processed)
1329 }
1330
1331 #[cfg(any(test, feature = "try-runtime", feature = "std"))]
1351 pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
1352 ensure!(
1354 BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1355 "Memory Corruption in BookStateFor"
1356 );
1357 ensure!(
1359 Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1360 "Memory Corruption in Pages"
1361 );
1362
1363 for book in BookStateFor::<T>::iter_values() {
1365 ensure!(book.end >= book.begin, "Invariant");
1366 ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1367 ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1368 ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1369 ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1370
1371 let fp: QueueFootprint = book.into();
1372 ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1373 }
1374
1375 let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1377
1378 while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1379 ensure!(
1380 BookStateFor::<T>::contains_key(&head),
1381 "Service head must point to an existing book"
1382 );
1383
1384 let head_book_state = BookStateFor::<T>::get(&head);
1385 ensure!(
1386 head_book_state.message_count > 0,
1387 "There must be some messages if in ReadyRing"
1388 );
1389 ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1390 ensure!(
1391 head_book_state.end > head_book_state.begin,
1392 "End > Begin if unprocessed messages exists"
1393 );
1394 ensure!(
1395 head_book_state.ready_neighbours.is_some(),
1396 "There must be neighbours if in ReadyRing"
1397 );
1398
1399 if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1400 ensure!(
1401 head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1402 "Can only happen if only queue in ReadyRing"
1403 );
1404 }
1405
1406 for page_index in head_book_state.begin..head_book_state.end {
1407 let page = Pages::<T>::get(&head, page_index).unwrap();
1408 let remaining_messages = page.remaining;
1409 let mut counted_remaining_messages: u32 = 0;
1410 ensure!(
1411 remaining_messages > 0.into(),
1412 "These must be some messages that have not been processed yet!"
1413 );
1414
1415 for i in 0..u32::MAX {
1416 if let Some((_, processed, _)) = page.peek_index(i as usize) {
1417 if !processed {
1418 counted_remaining_messages += 1;
1419 }
1420 } else {
1421 break
1422 }
1423 }
1424
1425 ensure!(
1426 remaining_messages.into() == counted_remaining_messages,
1427 "Memory Corruption"
1428 );
1429 }
1430
1431 if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1432 break
1433 }
1434 }
1435 Ok(())
1436 }
1437
1438 #[cfg(feature = "std")]
1454 pub fn debug_info() -> String {
1455 let mut info = String::new();
1456 for (origin, book_state) in BookStateFor::<T>::iter() {
1457 let mut queue = format!("queue {:?}:\n", &origin);
1458 let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1459 pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1460 for (page_index, mut page) in pages.into_iter() {
1461 let page_info = if book_state.begin == page_index { ">" } else { " " };
1462 let mut page_info = format!(
1463 "{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1464 page_info, page_index, page.first, page.last, page.remaining
1465 );
1466 for i in 0..u32::MAX {
1467 if let Some((_, processed, message)) =
1468 page.peek_index(i.try_into().expect("std-only code"))
1469 {
1470 let msg = String::from_utf8_lossy(message);
1471 if processed {
1472 page_info.push('*');
1473 }
1474 page_info.push_str(&format!("{:?}, ", msg));
1475 page.skip_first(true);
1476 } else {
1477 break
1478 }
1479 }
1480 page_info.push_str("]\n");
1481 queue.push_str(&page_info);
1482 }
1483 info.push_str(&queue);
1484 }
1485 info
1486 }
1487
1488 fn process_message_payload(
1496 origin: MessageOriginOf<T>,
1497 page_index: PageIndex,
1498 message_index: T::Size,
1499 message: &[u8],
1500 meter: &mut WeightMeter,
1501 overweight_limit: Weight,
1502 ) -> MessageExecutionStatus {
1503 let mut id = sp_io::hashing::blake2_256(message);
1504 use ProcessMessageError::*;
1505 let prev_consumed = meter.consumed();
1506
1507 let transaction =
1508 storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1509 let res =
1510 T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1511 match &res {
1512 Ok(_) => TransactionOutcome::Commit(Ok(res)),
1513 Err(_) => TransactionOutcome::Rollback(Ok(res)),
1514 }
1515 });
1516
1517 let transaction = match transaction {
1518 Ok(result) => result,
1519 _ => {
1520 defensive!(
1521 "Error occurred processing message, storage changes will be rolled back"
1522 );
1523 return MessageExecutionStatus::Unprocessable { permanent: true }
1524 },
1525 };
1526
1527 match transaction {
1528 Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1529 Self::deposit_event(Event::<T>::OverweightEnqueued {
1531 id,
1532 origin,
1533 page_index,
1534 message_index,
1535 });
1536 MessageExecutionStatus::Overweight
1537 },
1538 Err(Overweight(_)) => {
1539 MessageExecutionStatus::InsufficientWeight
1542 },
1543 Err(Yield) => {
1544 MessageExecutionStatus::Unprocessable { permanent: false }
1546 },
1547 Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1548 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1550 MessageExecutionStatus::Unprocessable { permanent: true }
1551 },
1552 Err(error @ StackLimitReached) => {
1553 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1554 MessageExecutionStatus::StackLimitReached
1555 },
1556 Ok(success) => {
1557 let weight_used = meter.consumed().saturating_sub(prev_consumed);
1559 Self::deposit_event(Event::<T>::Processed {
1560 id: id.into(),
1561 origin,
1562 weight_used,
1563 success,
1564 });
1565 MessageExecutionStatus::Processed
1566 },
1567 }
1568 }
1569
1570 fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1571 let mut weight = WeightMeter::with_limit(weight_limit);
1572
1573 let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1575 if matches!(context, ServiceQueuesContext::OnInitialize) {
1576 defensive!("Not enough weight to service a single message.");
1577 }
1578 Weight::zero()
1579 });
1580
1581 match with_service_mutex(|| {
1582 let mut next = match Self::bump_service_head(&mut weight) {
1583 Some(h) => h,
1584 None => return weight.consumed(),
1585 };
1586 let mut last_no_progress = None;
1590
1591 loop {
1592 let (progressed, n) =
1593 Self::service_queue(next.clone(), &mut weight, overweight_limit);
1594 next = match n {
1595 Some(n) =>
1596 if !progressed {
1597 if last_no_progress == Some(n.clone()) {
1598 break
1599 }
1600 if last_no_progress.is_none() {
1601 last_no_progress = Some(next.clone())
1602 }
1603 n
1604 } else {
1605 last_no_progress = None;
1606 n
1607 },
1608 None => break,
1609 }
1610 }
1611 weight.consumed()
1612 }) {
1613 Err(()) => weight.consumed(),
1614 Ok(w) => w,
1615 }
1616 }
1617}
1618
1619pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1621 environmental::environmental!(token: Option<()>);
1623
1624 token::using_once(&mut Some(()), || {
1625 let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1627
1628 defer! {
1630 token::with(|t| {
1631 *t = Some(hold);
1632 });
1633 }
1634
1635 Ok(f())
1636 })
1637}
1638
1639pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1641impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1642 fn get() -> u32 {
1643 T::max_encoded_len() as u32
1644 }
1645}
1646
1647pub struct MaxMessageLen<Origin, Size, HeapSize>(
1649 core::marker::PhantomData<(Origin, Size, HeapSize)>,
1650);
1651impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1652 for MaxMessageLen<Origin, Size, HeapSize>
1653{
1654 fn get() -> u32 {
1655 (HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1656 }
1657}
1658
1659pub type MaxMessageLenOf<T> =
1661 MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1662pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1664pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1666pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1668pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1670pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1672
1673pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1676impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1677 fn get() -> u32 {
1678 T::get().into()
1679 }
1680}
1681
1682impl<T: Config> ServiceQueues for Pallet<T> {
1683 type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1684
1685 fn service_queues(weight_limit: Weight) -> Weight {
1686 Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1687 }
1688
1689 fn execute_overweight(
1693 weight_limit: Weight,
1694 (message_origin, page, index): Self::OverweightMessageAddress,
1695 ) -> Result<Weight, ExecuteOverweightError> {
1696 let mut weight = WeightMeter::with_limit(weight_limit);
1697 if weight
1698 .try_consume(
1699 T::WeightInfo::execute_overweight_page_removed()
1700 .max(T::WeightInfo::execute_overweight_page_updated()),
1701 )
1702 .is_err()
1703 {
1704 return Err(ExecuteOverweightError::InsufficientWeight)
1705 }
1706
1707 Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
1708 |e| match e {
1709 Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1710 Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1711 Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1712 Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
1713 ExecuteOverweightError::NotFound,
1714 Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1715 _ => ExecuteOverweightError::Other,
1716 },
1717 )
1718 }
1719}
1720
1721impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
1722 type MaxMessageLen =
1723 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1724
1725 fn enqueue_message(
1726 message: BoundedSlice<u8, Self::MaxMessageLen>,
1727 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1728 ) {
1729 Self::do_enqueue_message(&origin, message);
1730 let book_state = BookStateFor::<T>::get(&origin);
1731 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1732 }
1733
1734 fn enqueue_messages<'a>(
1735 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1736 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1737 ) {
1738 for message in messages {
1739 Self::do_enqueue_message(&origin, message);
1740 }
1741 let book_state = BookStateFor::<T>::get(&origin);
1742 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1743 }
1744
1745 fn sweep_queue(origin: MessageOriginOf<T>) {
1746 if !BookStateFor::<T>::contains_key(&origin) {
1747 return
1748 }
1749 let mut book_state = BookStateFor::<T>::get(&origin);
1750 book_state.begin = book_state.end;
1751 if let Some(neighbours) = book_state.ready_neighbours.take() {
1752 Self::ready_ring_unknit(&origin, neighbours);
1753 }
1754 BookStateFor::<T>::insert(&origin, &book_state);
1755 }
1756
1757 fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1758 BookStateFor::<T>::get(&origin).into()
1759 }
1760}