1#![deny(missing_docs)]
197#![cfg_attr(not(feature = "std"), no_std)]
198
199mod benchmarking;
200mod integration_test;
201mod mock;
202pub mod mock_helpers;
203mod tests;
204pub mod weights;
205
206extern crate alloc;
207
208use alloc::{vec, vec::Vec};
209use codec::{Codec, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
210use core::{fmt::Debug, ops::Deref};
211use frame_support::{
212 defensive,
213 pallet_prelude::*,
214 traits::{
215 Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
216 ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
217 QueuePausedQuery, ServiceQueues,
218 },
219 BoundedSlice, CloneNoBound, DefaultNoBound,
220};
221use frame_system::pallet_prelude::*;
222pub use pallet::*;
223use scale_info::TypeInfo;
224use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use sp_core::{defer, H256};
226use sp_runtime::{
227 traits::{One, Zero},
228 SaturatedConversion, Saturating, TransactionOutcome,
229};
230use sp_weights::WeightMeter;
231pub use weights::WeightInfo;
232
233type PageIndex = u32;
235
236#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
238pub struct ItemHeader<Size> {
239 payload_len: Size,
242 is_processed: bool,
244}
245
246#[derive(
248 CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
249)]
250#[scale_info(skip_type_params(HeapSize))]
251#[codec(mel_bound(Size: MaxEncodedLen))]
252pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
253 remaining: Size,
256 remaining_size: Size,
260 first_index: Size,
262 first: Size,
265 last: Size,
267 heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
269}
270
271impl<
272 Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
273 HeapSize: Get<Size>,
274 > Page<Size, HeapSize>
275{
276 fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
278 let payload_len = message.len();
279 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
280 let payload_len = payload_len.saturated_into();
281 let header = ItemHeader::<Size> { payload_len, is_processed: false };
282
283 let mut heap = Vec::with_capacity(data_len);
284 header.using_encoded(|h| heap.extend_from_slice(h));
285 heap.extend_from_slice(message.deref());
286
287 Page {
288 remaining: One::one(),
289 remaining_size: payload_len,
290 first_index: Zero::zero(),
291 first: Zero::zero(),
292 last: Zero::zero(),
293 heap: BoundedVec::defensive_truncate_from(heap),
294 }
295 }
296
297 fn try_append_message<T: Config>(
299 &mut self,
300 message: BoundedSlice<u8, MaxMessageLenOf<T>>,
301 ) -> Result<(), ()> {
302 let pos = self.heap.len();
303 let payload_len = message.len();
304 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
305 let payload_len = payload_len.saturated_into();
306 let header = ItemHeader::<Size> { payload_len, is_processed: false };
307 let heap_size: u32 = HeapSize::get().into();
308 if (heap_size as usize).saturating_sub(self.heap.len()) < data_len {
309 return Err(())
311 }
312
313 let mut heap = core::mem::take(&mut self.heap).into_inner();
314 header.using_encoded(|h| heap.extend_from_slice(h));
315 heap.extend_from_slice(message.deref());
316 self.heap = BoundedVec::defensive_truncate_from(heap);
317 self.last = pos.saturated_into();
318 self.remaining.saturating_inc();
319 self.remaining_size.saturating_accrue(payload_len);
320 Ok(())
321 }
322
323 fn peek_first(&self) -> Option<BoundedSlice<u8, IntoU32<HeapSize, Size>>> {
327 if self.first > self.last {
328 return None
329 }
330 let f = (self.first.into() as usize).min(self.heap.len());
331 let mut item_slice = &self.heap[f..];
332 if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
333 let payload_len = h.payload_len.into() as usize;
334 if payload_len <= item_slice.len() {
335 return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]))
338 }
339 }
340 defensive!("message-queue: heap corruption");
341 None
342 }
343
344 fn skip_first(&mut self, is_processed: bool) {
346 let f = (self.first.into() as usize).min(self.heap.len());
347 if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
348 if is_processed && !h.is_processed {
349 h.is_processed = true;
350 h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
351 self.remaining.saturating_dec();
352 self.remaining_size.saturating_reduce(h.payload_len);
353 }
354 self.first
355 .saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
356 self.first.saturating_accrue(h.payload_len);
357 self.first_index.saturating_inc();
358 }
359 }
360
361 fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
363 let mut pos = 0;
364 let mut item_slice = &self.heap[..];
365 let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
366 for _ in 0..index {
367 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
368 let item_len = h.payload_len.into() as usize;
369 if item_slice.len() < item_len {
370 return None
371 }
372 item_slice = &item_slice[item_len..];
373 pos.saturating_accrue(header_len.saturating_add(item_len));
374 }
375 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
376 if item_slice.len() < h.payload_len.into() as usize {
377 return None
378 }
379 item_slice = &item_slice[..h.payload_len.into() as usize];
380 Some((pos, h.is_processed, item_slice))
381 }
382
383 fn note_processed_at_pos(&mut self, pos: usize) {
388 if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
389 if !h.is_processed {
390 h.is_processed = true;
391 h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
392 self.remaining.saturating_dec();
393 self.remaining_size.saturating_reduce(h.payload_len);
394 }
395 }
396 }
397
398 fn is_complete(&self) -> bool {
400 self.remaining.is_zero()
401 }
402}
403
404#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
406pub struct Neighbours<MessageOrigin> {
407 prev: MessageOrigin,
409 next: MessageOrigin,
411}
412
413#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
419pub struct BookState<MessageOrigin> {
420 begin: PageIndex,
423 end: PageIndex,
425 count: PageIndex,
430 ready_neighbours: Option<Neighbours<MessageOrigin>>,
433 message_count: u64,
435 size: u64,
437}
438
439impl<MessageOrigin> Default for BookState<MessageOrigin> {
440 fn default() -> Self {
441 Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
442 }
443}
444
445impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
446 fn from(book: BookState<MessageOrigin>) -> Self {
447 QueueFootprint {
448 pages: book.count,
449 ready_pages: book.end.defensive_saturating_sub(book.begin),
450 storage: Footprint { count: book.message_count, size: book.size },
451 }
452 }
453}
454
455pub trait OnQueueChanged<Id> {
457 fn on_queue_changed(id: Id, fp: QueueFootprint);
459}
460
461impl<Id> OnQueueChanged<Id> for () {
462 fn on_queue_changed(_: Id, _: QueueFootprint) {}
463}
464
465pub trait ForceSetHead<O> {
467 fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
474}
475
476#[frame_support::pallet]
477pub mod pallet {
478 use super::*;
479
480 #[pallet::pallet]
481 pub struct Pallet<T>(_);
482
483 #[pallet::config]
485 pub trait Config: frame_system::Config {
486 type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
488
489 type WeightInfo: WeightInfo;
491
492 type MessageProcessor: ProcessMessage;
503
504 type Size: BaseArithmetic
506 + Unsigned
507 + Copy
508 + Into<u32>
509 + Member
510 + Encode
511 + Decode
512 + MaxEncodedLen
513 + TypeInfo
514 + Default;
515
516 type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
519
520 type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
526
527 #[pallet::constant]
533 type HeapSize: Get<Self::Size>;
534
535 #[pallet::constant]
539 type MaxStale: Get<u32>;
540
541 #[pallet::constant]
548 type ServiceWeight: Get<Option<Weight>>;
549
550 #[pallet::constant]
556 type IdleMaxServiceWeight: Get<Option<Weight>>;
557 }
558
559 #[pallet::event]
560 #[pallet::generate_deposit(pub(super) fn deposit_event)]
561 pub enum Event<T: Config> {
562 ProcessingFailed {
564 id: H256,
566 origin: MessageOriginOf<T>,
568 error: ProcessMessageError,
573 },
574 Processed {
576 id: H256,
578 origin: MessageOriginOf<T>,
580 weight_used: Weight,
582 success: bool,
589 },
590 OverweightEnqueued {
592 id: [u8; 32],
594 origin: MessageOriginOf<T>,
596 page_index: PageIndex,
598 message_index: T::Size,
600 },
601 PageReaped {
603 origin: MessageOriginOf<T>,
605 index: PageIndex,
607 },
608 }
609
610 #[pallet::error]
611 pub enum Error<T> {
612 NotReapable,
615 NoPage,
617 NoMessage,
619 AlreadyProcessed,
621 Queued,
623 InsufficientWeight,
625 TemporarilyUnprocessable,
630 QueuePaused,
634 RecursiveDisallowed,
636 }
637
638 #[pallet::storage]
640 pub type BookStateFor<T: Config> =
641 StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
642
643 #[pallet::storage]
645 pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
646
647 #[pallet::storage]
649 pub type Pages<T: Config> = StorageDoubleMap<
650 _,
651 Twox64Concat,
652 MessageOriginOf<T>,
653 Twox64Concat,
654 PageIndex,
655 Page<T::Size, T::HeapSize>,
656 OptionQuery,
657 >;
658
659 #[pallet::hooks]
660 impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
661 fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
662 if let Some(weight_limit) = T::ServiceWeight::get() {
663 Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
664 } else {
665 Weight::zero()
666 }
667 }
668
669 fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
670 if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
671 Self::service_queues_impl(
673 weight_limit.min(remaining_weight),
674 ServiceQueuesContext::OnIdle,
675 )
676 } else {
677 Weight::zero()
678 }
679 }
680
681 #[cfg(feature = "try-runtime")]
682 fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
683 Self::do_try_state()
684 }
685
686 #[cfg(test)]
688 fn integrity_test() {
689 Self::do_integrity_test().expect("Pallet config is valid; qed")
690 }
691 }
692
693 #[pallet::call]
694 impl<T: Config> Pallet<T> {
695 #[pallet::call_index(0)]
697 #[pallet::weight(T::WeightInfo::reap_page())]
698 pub fn reap_page(
699 origin: OriginFor<T>,
700 message_origin: MessageOriginOf<T>,
701 page_index: PageIndex,
702 ) -> DispatchResult {
703 let _ = ensure_signed(origin)?;
704 Self::do_reap_page(&message_origin, page_index)
705 }
706
707 #[pallet::call_index(1)]
721 #[pallet::weight(
722 T::WeightInfo::execute_overweight_page_updated().max(
723 T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
724 )]
725 pub fn execute_overweight(
726 origin: OriginFor<T>,
727 message_origin: MessageOriginOf<T>,
728 page: PageIndex,
729 index: T::Size,
730 weight_limit: Weight,
731 ) -> DispatchResultWithPostInfo {
732 let _ = ensure_signed(origin)?;
733 let actual_weight =
734 Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
735 Ok(Some(actual_weight).into())
736 }
737 }
738}
739
740#[derive(PartialEq, Debug)]
742enum PageExecutionStatus {
743 Bailed,
745 NoProgress,
749 NoMore,
755}
756
757#[derive(PartialEq, Debug)]
759enum ItemExecutionStatus {
760 Bailed,
762 NoProgress,
766 NoItem,
768 Executed(bool),
772}
773
774#[derive(PartialEq)]
776enum MessageExecutionStatus {
777 InsufficientWeight,
779 Overweight,
781 Processed,
783 Unprocessable { permanent: bool },
785 StackLimitReached,
792}
793
794#[derive(PartialEq)]
797enum ServiceQueuesContext {
798 OnIdle,
800 OnInitialize,
802 ServiceQueues,
804}
805
806impl<T: Config> Pallet<T> {
807 fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
811 if let Some(head) = ServiceHead::<T>::get() {
812 let mut head_book_state = BookStateFor::<T>::get(&head);
813 let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
814 let tail = head_neighbours.prev;
815 head_neighbours.prev = origin.clone();
816 head_book_state.ready_neighbours = Some(head_neighbours);
817 BookStateFor::<T>::insert(&head, head_book_state);
818
819 let mut tail_book_state = BookStateFor::<T>::get(&tail);
820 let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
821 tail_neighbours.next = origin.clone();
822 tail_book_state.ready_neighbours = Some(tail_neighbours);
823 BookStateFor::<T>::insert(&tail, tail_book_state);
824
825 Ok(Neighbours { next: head, prev: tail })
826 } else {
827 ServiceHead::<T>::put(origin);
828 Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
829 }
830 }
831
832 fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
833 if origin == &neighbours.next {
834 debug_assert!(
835 origin == &neighbours.prev,
836 "unknitting from single item ring; outgoing must be only item"
837 );
838 ServiceHead::<T>::kill();
840 } else {
841 BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
842 if let Some(ref mut n) = book_state.ready_neighbours {
843 n.prev = neighbours.prev.clone()
844 }
845 });
846 BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
847 if let Some(ref mut n) = book_state.ready_neighbours {
848 n.next = neighbours.next.clone()
849 }
850 });
851 if let Some(head) = ServiceHead::<T>::get() {
852 if &head == origin {
853 ServiceHead::<T>::put(neighbours.next);
854 }
855 } else {
856 defensive!("`ServiceHead` must be some if there was a ready queue");
857 }
858 }
859 }
860
861 fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
865 if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
866 return None
867 }
868
869 if let Some(head) = ServiceHead::<T>::get() {
870 let mut head_book_state = BookStateFor::<T>::get(&head);
871 if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
872 ServiceHead::<T>::put(&head_neighbours.next);
873 Some(head)
874 } else {
875 defensive!("The head must point to a queue in the ready ring");
876 None
877 }
878 } else {
879 None
880 }
881 }
882
883 fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
884 if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
885 return Err(())
886 }
887
888 if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
890 ServiceHead::<T>::put(queue);
891 Ok(true)
892 } else {
893 Ok(false)
894 }
895 }
896
897 fn max_message_weight(limit: Weight) -> Option<Weight> {
903 let service_weight = T::ServiceWeight::get().unwrap_or_default();
904 let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
905
906 let max_message_weight =
909 if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
910
911 if max_message_weight.is_zero() {
912 limit.checked_sub(&Self::single_msg_overhead())
914 } else {
915 max_message_weight.checked_sub(&Self::single_msg_overhead())
916 }
917 }
918
919 fn single_msg_overhead() -> Weight {
921 T::WeightInfo::bump_service_head()
922 .saturating_add(T::WeightInfo::service_queue_base())
923 .saturating_add(
924 T::WeightInfo::service_page_base_completion()
925 .max(T::WeightInfo::service_page_base_no_completion()),
926 )
927 .saturating_add(T::WeightInfo::service_page_item())
928 .saturating_add(T::WeightInfo::ready_ring_unknit())
929 }
930
931 #[cfg(test)]
935 fn do_integrity_test() -> Result<(), String> {
936 ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
937
938 let max_block = T::BlockWeights::get().max_block;
939
940 if let Some(service) = T::ServiceWeight::get() {
941 if Self::max_message_weight(service).is_none() {
942 return Err(format!(
943 "ServiceWeight too low: {}. Must be at least {}",
944 service,
945 Self::single_msg_overhead(),
946 ))
947 }
948
949 if service.any_gt(max_block) {
950 return Err(format!(
951 "ServiceWeight {service} is bigger than max block weight {max_block}"
952 ))
953 }
954 }
955
956 if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
957 if on_idle.any_gt(max_block) {
958 return Err(format!(
959 "IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
960 ))
961 }
962 }
963
964 if let (Some(service_weight), Some(on_idle)) =
965 (T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
966 {
967 if !(service_weight.all_gt(on_idle) ||
968 on_idle.all_gt(service_weight) ||
969 service_weight == on_idle)
970 {
971 return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into())
972 }
973 }
974
975 Ok(())
976 }
977
978 fn do_enqueue_messages<'a>(
979 origin: &MessageOriginOf<T>,
980 messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
981 ) {
982 let mut book_state = BookStateFor::<T>::get(origin);
983
984 let mut maybe_page = None;
985 if book_state.end > book_state.begin {
987 debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
988 maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
989 defensive!("Corruption: referenced page doesn't exist.");
990 None
991 });
992 }
993
994 for message in messages {
995 if let Some(mut page) = maybe_page {
997 maybe_page = match page.try_append_message::<T>(message) {
998 Ok(_) => Some(page),
999 Err(_) => {
1000 Pages::<T>::insert(origin, book_state.end - 1, page);
1003 None
1004 },
1005 }
1006 }
1007 if maybe_page.is_none() {
1009 book_state.end.saturating_inc();
1010 book_state.count.saturating_inc();
1011 maybe_page = Some(Page::from_message::<T>(message));
1012 }
1013
1014 book_state.message_count.saturating_inc();
1016 book_state
1017 .size
1018 .saturating_accrue(message.len() as u64);
1020 }
1021
1022 if let Some(page) = maybe_page {
1024 Pages::<T>::insert(origin, book_state.end - 1, page);
1025 }
1026
1027 if book_state.ready_neighbours.is_none() {
1029 match Self::ready_ring_knit(origin) {
1030 Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
1031 Err(()) => {
1032 defensive!("Ring state invalid when knitting");
1033 },
1034 }
1035 }
1036
1037 BookStateFor::<T>::insert(origin, book_state);
1039 }
1040
1041 pub fn do_execute_overweight(
1046 origin: MessageOriginOf<T>,
1047 page_index: PageIndex,
1048 index: T::Size,
1049 weight_limit: Weight,
1050 ) -> Result<Weight, Error<T>> {
1051 match with_service_mutex(|| {
1052 Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1053 }) {
1054 Err(()) => Err(Error::<T>::RecursiveDisallowed),
1055 Ok(x) => x,
1056 }
1057 }
1058
1059 fn do_execute_overweight_inner(
1061 origin: MessageOriginOf<T>,
1062 page_index: PageIndex,
1063 index: T::Size,
1064 weight_limit: Weight,
1065 ) -> Result<Weight, Error<T>> {
1066 let mut book_state = BookStateFor::<T>::get(&origin);
1067 ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1068
1069 let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1070 let (pos, is_processed, payload) =
1071 page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1072 let payload_len = payload.len() as u64;
1073 ensure!(
1074 page_index < book_state.begin ||
1075 (page_index == book_state.begin && pos < page.first.into() as usize),
1076 Error::<T>::Queued
1077 );
1078 ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1079 use MessageExecutionStatus::*;
1080 let mut weight_counter = WeightMeter::with_limit(weight_limit);
1081 match Self::process_message_payload(
1082 origin.clone(),
1083 page_index,
1084 index,
1085 payload,
1086 &mut weight_counter,
1087 Weight::MAX,
1088 ) {
1091 Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1092 StackLimitReached | Unprocessable { permanent: false } =>
1093 Err(Error::<T>::TemporarilyUnprocessable),
1094 Unprocessable { permanent: true } | Processed => {
1095 page.note_processed_at_pos(pos);
1096 book_state.message_count.saturating_dec();
1097 book_state.size.saturating_reduce(payload_len);
1098 let page_weight = if page.remaining.is_zero() {
1099 debug_assert!(
1100 page.remaining_size.is_zero(),
1101 "no messages remaining; no space taken; qed"
1102 );
1103 Pages::<T>::remove(&origin, page_index);
1104 debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1105 book_state.count.saturating_dec();
1106 T::WeightInfo::execute_overweight_page_removed()
1107 } else {
1110 Pages::<T>::insert(&origin, page_index, page);
1111 T::WeightInfo::execute_overweight_page_updated()
1112 };
1113 BookStateFor::<T>::insert(&origin, &book_state);
1114 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1115 Ok(weight_counter.consumed().saturating_add(page_weight))
1116 },
1117 }
1118 }
1119
1120 fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1122 match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1123 Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1124 Ok(x) => x,
1125 }
1126 }
1127
1128 fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1130 let mut book_state = BookStateFor::<T>::get(origin);
1131 ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1134
1135 let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1136
1137 let reapable = page.remaining.is_zero();
1139
1140 let cullable = || {
1142 let total_pages = book_state.count;
1143 let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1144
1145 let stale_pages = total_pages - ready_pages;
1153
1154 let max_stale = T::MaxStale::get();
1158
1159 let overflow = match stale_pages.checked_sub(max_stale + 1) {
1162 Some(x) => x + 1,
1163 None => return false,
1164 };
1165
1166 let backlog = (max_stale * max_stale / overflow).max(max_stale);
1175
1176 let watermark = book_state.begin.saturating_sub(backlog);
1177 page_index < watermark
1178 };
1179 ensure!(reapable || cullable(), Error::<T>::NotReapable);
1180
1181 Pages::<T>::remove(origin, page_index);
1182 debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1183 book_state.count.saturating_dec();
1184 book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1185 book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1186 BookStateFor::<T>::insert(origin, &book_state);
1187 T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1188 Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1189
1190 Ok(())
1191 }
1192
1193 fn service_queue(
1197 origin: MessageOriginOf<T>,
1198 weight: &mut WeightMeter,
1199 overweight_limit: Weight,
1200 ) -> (bool, Option<MessageOriginOf<T>>) {
1201 use PageExecutionStatus::*;
1202 if weight
1203 .try_consume(
1204 T::WeightInfo::service_queue_base()
1205 .saturating_add(T::WeightInfo::ready_ring_unknit()),
1206 )
1207 .is_err()
1208 {
1209 return (false, None)
1210 }
1211
1212 let mut book_state = BookStateFor::<T>::get(&origin);
1213 let mut total_processed = 0;
1214 if T::QueuePausedQuery::is_paused(&origin) {
1215 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1216 return (false, next_ready)
1217 }
1218
1219 while book_state.end > book_state.begin {
1220 let (processed, status) =
1221 Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1222 total_processed.saturating_accrue(processed);
1223 match status {
1224 Bailed | NoProgress => break,
1226 NoMore => (),
1228 };
1229 book_state.begin.saturating_inc();
1230 }
1231 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1232 if book_state.begin >= book_state.end {
1233 if let Some(neighbours) = book_state.ready_neighbours.take() {
1235 Self::ready_ring_unknit(&origin, neighbours);
1236 } else if total_processed > 0 {
1237 defensive!("Freshly processed queue must have been ready");
1238 }
1239 }
1240 BookStateFor::<T>::insert(&origin, &book_state);
1241 if total_processed > 0 {
1242 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1243 }
1244 (total_processed > 0, next_ready)
1245 }
1246
1247 fn service_page(
1251 origin: &MessageOriginOf<T>,
1252 book_state: &mut BookStateOf<T>,
1253 weight: &mut WeightMeter,
1254 overweight_limit: Weight,
1255 ) -> (u32, PageExecutionStatus) {
1256 use PageExecutionStatus::*;
1257 if weight
1258 .try_consume(
1259 T::WeightInfo::service_page_base_completion()
1260 .max(T::WeightInfo::service_page_base_no_completion()),
1261 )
1262 .is_err()
1263 {
1264 return (0, Bailed)
1265 }
1266
1267 let page_index = book_state.begin;
1268 let mut page = match Pages::<T>::get(origin, page_index) {
1269 Some(p) => p,
1270 None => {
1271 defensive!("message-queue: referenced page not found");
1272 return (0, NoMore)
1273 },
1274 };
1275
1276 let mut total_processed = 0;
1277
1278 let status = loop {
1280 use ItemExecutionStatus::*;
1281 match Self::service_page_item(
1282 origin,
1283 page_index,
1284 book_state,
1285 &mut page,
1286 weight,
1287 overweight_limit,
1288 ) {
1289 Bailed => break PageExecutionStatus::Bailed,
1290 NoItem => break PageExecutionStatus::NoMore,
1291 NoProgress => break PageExecutionStatus::NoProgress,
1292 Executed(true) => total_processed.saturating_inc(),
1294 Executed(false) => (),
1295 }
1296 };
1297
1298 if page.is_complete() {
1299 debug_assert!(status != Bailed, "we never bail if a page became complete");
1300 Pages::<T>::remove(origin, page_index);
1301 debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1302 book_state.count.saturating_dec();
1303 } else {
1304 Pages::<T>::insert(origin, page_index, page);
1305 }
1306 (total_processed, status)
1307 }
1308
1309 pub(crate) fn service_page_item(
1311 origin: &MessageOriginOf<T>,
1312 page_index: PageIndex,
1313 book_state: &mut BookStateOf<T>,
1314 page: &mut PageOf<T>,
1315 weight: &mut WeightMeter,
1316 overweight_limit: Weight,
1317 ) -> ItemExecutionStatus {
1318 use MessageExecutionStatus::*;
1319 if page.is_complete() {
1322 return ItemExecutionStatus::NoItem
1323 }
1324 if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1325 return ItemExecutionStatus::Bailed
1326 }
1327
1328 let payload = &match page.peek_first() {
1329 Some(m) => m,
1330 None => return ItemExecutionStatus::NoItem,
1331 }[..];
1332 let payload_len = payload.len() as u64;
1333
1334 Pages::<T>::insert(origin, page_index, &*page);
1336 BookStateFor::<T>::insert(origin, &*book_state);
1337
1338 let res = Self::process_message_payload(
1339 origin.clone(),
1340 page_index,
1341 page.first_index,
1342 payload,
1343 weight,
1344 overweight_limit,
1345 );
1346
1347 *book_state = BookStateFor::<T>::get(origin);
1349 if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1350 *page = new_page;
1351 } else {
1352 defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1353 return ItemExecutionStatus::NoItem
1354 };
1355
1356 let is_processed = match res {
1357 InsufficientWeight => return ItemExecutionStatus::Bailed,
1358 Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1359 Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1360 Overweight => false,
1361 };
1362
1363 if is_processed {
1364 book_state.message_count.saturating_dec();
1365 book_state.size.saturating_reduce(payload_len as u64);
1366 }
1367 page.skip_first(is_processed);
1368 ItemExecutionStatus::Executed(is_processed)
1369 }
1370
1371 #[cfg(any(test, feature = "try-runtime", feature = "std"))]
1391 pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
1392 ensure!(
1394 BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1395 "Memory Corruption in BookStateFor"
1396 );
1397 ensure!(
1399 Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1400 "Memory Corruption in Pages"
1401 );
1402
1403 for book in BookStateFor::<T>::iter_values() {
1405 ensure!(book.end >= book.begin, "Invariant");
1406 ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1407 ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1408 ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1409 ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1410
1411 let fp: QueueFootprint = book.into();
1412 ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1413 }
1414
1415 let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1417
1418 while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1419 ensure!(
1420 BookStateFor::<T>::contains_key(&head),
1421 "Service head must point to an existing book"
1422 );
1423
1424 let head_book_state = BookStateFor::<T>::get(&head);
1425 ensure!(
1426 head_book_state.message_count > 0,
1427 "There must be some messages if in ReadyRing"
1428 );
1429 ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1430 ensure!(
1431 head_book_state.end > head_book_state.begin,
1432 "End > Begin if unprocessed messages exists"
1433 );
1434 ensure!(
1435 head_book_state.ready_neighbours.is_some(),
1436 "There must be neighbours if in ReadyRing"
1437 );
1438
1439 if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1440 ensure!(
1441 head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1442 "Can only happen if only queue in ReadyRing"
1443 );
1444 }
1445
1446 for page_index in head_book_state.begin..head_book_state.end {
1447 let page = Pages::<T>::get(&head, page_index).unwrap();
1448 let remaining_messages = page.remaining;
1449 let mut counted_remaining_messages: u32 = 0;
1450 ensure!(
1451 remaining_messages > 0.into(),
1452 "These must be some messages that have not been processed yet!"
1453 );
1454
1455 for i in 0..u32::MAX {
1456 if let Some((_, processed, _)) = page.peek_index(i as usize) {
1457 if !processed {
1458 counted_remaining_messages += 1;
1459 }
1460 } else {
1461 break
1462 }
1463 }
1464
1465 ensure!(
1466 remaining_messages.into() == counted_remaining_messages,
1467 "Memory Corruption"
1468 );
1469 }
1470
1471 if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1472 break
1473 }
1474 }
1475 Ok(())
1476 }
1477
1478 #[cfg(feature = "std")]
1494 pub fn debug_info() -> String {
1495 let mut info = String::new();
1496 for (origin, book_state) in BookStateFor::<T>::iter() {
1497 let mut queue = format!("queue {:?}:\n", &origin);
1498 let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1499 pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1500 for (page_index, mut page) in pages.into_iter() {
1501 let page_info = if book_state.begin == page_index { ">" } else { " " };
1502 let mut page_info = format!(
1503 "{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1504 page_info, page_index, page.first, page.last, page.remaining
1505 );
1506 for i in 0..u32::MAX {
1507 if let Some((_, processed, message)) =
1508 page.peek_index(i.try_into().expect("std-only code"))
1509 {
1510 let msg = String::from_utf8_lossy(message);
1511 if processed {
1512 page_info.push('*');
1513 }
1514 page_info.push_str(&format!("{:?}, ", msg));
1515 page.skip_first(true);
1516 } else {
1517 break
1518 }
1519 }
1520 page_info.push_str("]\n");
1521 queue.push_str(&page_info);
1522 }
1523 info.push_str(&queue);
1524 }
1525 info
1526 }
1527
1528 fn process_message_payload(
1536 origin: MessageOriginOf<T>,
1537 page_index: PageIndex,
1538 message_index: T::Size,
1539 message: &[u8],
1540 meter: &mut WeightMeter,
1541 overweight_limit: Weight,
1542 ) -> MessageExecutionStatus {
1543 let mut id = sp_io::hashing::blake2_256(message);
1544 use ProcessMessageError::*;
1545 let prev_consumed = meter.consumed();
1546
1547 let transaction =
1548 storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1549 let res =
1550 T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1551 match &res {
1552 Ok(_) => TransactionOutcome::Commit(Ok(res)),
1553 Err(_) => TransactionOutcome::Rollback(Ok(res)),
1554 }
1555 });
1556
1557 let transaction = match transaction {
1558 Ok(result) => result,
1559 _ => {
1560 defensive!(
1561 "Error occurred processing message, storage changes will be rolled back"
1562 );
1563 return MessageExecutionStatus::Unprocessable { permanent: true }
1564 },
1565 };
1566
1567 match transaction {
1568 Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1569 Self::deposit_event(Event::<T>::OverweightEnqueued {
1571 id,
1572 origin,
1573 page_index,
1574 message_index,
1575 });
1576 MessageExecutionStatus::Overweight
1577 },
1578 Err(Overweight(_)) => {
1579 MessageExecutionStatus::InsufficientWeight
1582 },
1583 Err(Yield) => {
1584 MessageExecutionStatus::Unprocessable { permanent: false }
1586 },
1587 Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1588 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1590 MessageExecutionStatus::Unprocessable { permanent: true }
1591 },
1592 Err(error @ StackLimitReached) => {
1593 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1594 MessageExecutionStatus::StackLimitReached
1595 },
1596 Ok(success) => {
1597 let weight_used = meter.consumed().saturating_sub(prev_consumed);
1599 Self::deposit_event(Event::<T>::Processed {
1600 id: id.into(),
1601 origin,
1602 weight_used,
1603 success,
1604 });
1605 MessageExecutionStatus::Processed
1606 },
1607 }
1608 }
1609
1610 fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1611 let mut weight = WeightMeter::with_limit(weight_limit);
1612
1613 let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1615 if matches!(context, ServiceQueuesContext::OnInitialize) {
1616 defensive!("Not enough weight to service a single message.");
1617 }
1618 Weight::zero()
1619 });
1620
1621 match with_service_mutex(|| {
1622 let mut next = match Self::bump_service_head(&mut weight) {
1623 Some(h) => h,
1624 None => return weight.consumed(),
1625 };
1626 let mut last_no_progress = None;
1630
1631 loop {
1632 let (progressed, n) =
1633 Self::service_queue(next.clone(), &mut weight, overweight_limit);
1634 next = match n {
1635 Some(n) =>
1636 if !progressed {
1637 if last_no_progress == Some(n.clone()) {
1638 break
1639 }
1640 if last_no_progress.is_none() {
1641 last_no_progress = Some(next.clone())
1642 }
1643 n
1644 } else {
1645 last_no_progress = None;
1646 n
1647 },
1648 None => break,
1649 }
1650 }
1651 weight.consumed()
1652 }) {
1653 Err(()) => weight.consumed(),
1654 Ok(w) => w,
1655 }
1656 }
1657}
1658
1659impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pallet<T> {
1660 fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
1661 Pallet::<T>::set_service_head(weight, origin)
1662 }
1663}
1664
1665pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1667 environmental::environmental!(token: Option<()>);
1669
1670 token::using_once(&mut Some(()), || {
1671 let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1673
1674 defer! {
1676 token::with(|t| {
1677 *t = Some(hold);
1678 });
1679 }
1680
1681 Ok(f())
1682 })
1683}
1684
1685pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1687impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1688 fn get() -> u32 {
1689 T::max_encoded_len() as u32
1690 }
1691}
1692
1693pub struct MaxMessageLen<Origin, Size, HeapSize>(
1695 core::marker::PhantomData<(Origin, Size, HeapSize)>,
1696);
1697impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1698 for MaxMessageLen<Origin, Size, HeapSize>
1699{
1700 fn get() -> u32 {
1701 (HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1702 }
1703}
1704
1705pub type MaxMessageLenOf<T> =
1707 MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1708pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1710pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1712pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1714pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1716pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1718
1719pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1722impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1723 fn get() -> u32 {
1724 T::get().into()
1725 }
1726}
1727
1728impl<T: Config> ServiceQueues for Pallet<T> {
1729 type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1730
1731 fn service_queues(weight_limit: Weight) -> Weight {
1732 Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1733 }
1734
1735 fn execute_overweight(
1739 weight_limit: Weight,
1740 (message_origin, page, index): Self::OverweightMessageAddress,
1741 ) -> Result<Weight, ExecuteOverweightError> {
1742 let mut weight = WeightMeter::with_limit(weight_limit);
1743 if weight
1744 .try_consume(
1745 T::WeightInfo::execute_overweight_page_removed()
1746 .max(T::WeightInfo::execute_overweight_page_updated()),
1747 )
1748 .is_err()
1749 {
1750 return Err(ExecuteOverweightError::InsufficientWeight)
1751 }
1752
1753 Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
1754 |e| match e {
1755 Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1756 Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1757 Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1758 Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued =>
1759 ExecuteOverweightError::NotFound,
1760 Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1761 _ => ExecuteOverweightError::Other,
1762 },
1763 )
1764 }
1765}
1766
1767impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
1768 type MaxMessageLen =
1769 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1770
1771 fn enqueue_message(
1772 message: BoundedSlice<u8, Self::MaxMessageLen>,
1773 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1774 ) {
1775 Self::do_enqueue_messages(&origin, [message].into_iter());
1776 let book_state = BookStateFor::<T>::get(&origin);
1777 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1778 }
1779
1780 fn enqueue_messages<'a>(
1781 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1782 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1783 ) {
1784 Self::do_enqueue_messages(&origin, messages);
1785 let book_state = BookStateFor::<T>::get(&origin);
1786 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1787 }
1788
1789 fn sweep_queue(origin: MessageOriginOf<T>) {
1790 if !BookStateFor::<T>::contains_key(&origin) {
1791 return
1792 }
1793 let mut book_state = BookStateFor::<T>::get(&origin);
1794 book_state.begin = book_state.end;
1795 if let Some(neighbours) = book_state.ready_neighbours.take() {
1796 Self::ready_ring_unknit(&origin, neighbours);
1797 }
1798 BookStateFor::<T>::insert(&origin, &book_state);
1799 }
1800
1801 fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1802 BookStateFor::<T>::get(&origin).into()
1803 }
1804}