1#![deny(missing_docs)]
198#![cfg_attr(not(feature = "std"), no_std)]
199
200mod benchmarking;
201mod integration_test;
202mod mock;
203pub mod mock_helpers;
204mod tests;
205pub mod weights;
206
207extern crate alloc;
208
209use alloc::vec::Vec;
210use codec::{Codec, ConstEncodedLen, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
211use core::{fmt::Debug, ops::Deref};
212use pezframe_support::{
213 defensive,
214 pezpallet_prelude::*,
215 traits::{
216 BatchesFootprints, Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
217 ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
218 QueueFootprintQuery, QueuePausedQuery, ServiceQueues,
219 },
220 BoundedSlice, CloneNoBound, DefaultNoBound,
221};
222use pezframe_system::pezpallet_prelude::*;
223pub use pezpallet::*;
224use pezsp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use pezsp_core::{defer, H256};
226use pezsp_runtime::{
227 traits::{One, Zero},
228 SaturatedConversion, Saturating, TransactionOutcome,
229};
230use pezsp_weights::WeightMeter;
231use scale_info::TypeInfo;
232pub use weights::WeightInfo;
233
234type PageIndex = u32;
236
237#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
239pub struct ItemHeader<Size> {
240 payload_len: Size,
243 is_processed: bool,
245}
246
247impl<Size: ConstEncodedLen> ConstEncodedLen for ItemHeader<Size> {} #[derive(
251 CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
252)]
253#[scale_info(skip_type_params(HeapSize))]
254#[codec(mel_bound(Size: MaxEncodedLen))]
255pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
256 remaining: Size,
259 remaining_size: Size,
263 first_index: Size,
265 first: Size,
268 last: Size,
270 heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
272}
273
274impl<
275 Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
276 HeapSize: Get<Size>,
277 > Page<Size, HeapSize>
278where
279 ItemHeader<Size>: ConstEncodedLen,
280{
281 fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
283 let payload_len = message.len();
284 let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
285 let payload_len = payload_len.saturated_into();
286 let header = ItemHeader::<Size> { payload_len, is_processed: false };
287
288 let mut heap = Vec::with_capacity(data_len);
289 header.using_encoded(|h| heap.extend_from_slice(h));
290 heap.extend_from_slice(message.deref());
291
292 Page {
293 remaining: One::one(),
294 remaining_size: payload_len,
295 first_index: Zero::zero(),
296 first: Zero::zero(),
297 last: Zero::zero(),
298 heap: BoundedVec::defensive_truncate_from(heap),
299 }
300 }
301
302 fn heap_pos(&self) -> usize {
304 self.heap.len()
307 }
308
309 fn can_append_message_at(pos: usize, message_len: usize) -> Result<usize, ()> {
314 let header_size = ItemHeader::<Size>::max_encoded_len();
315 let data_len = header_size.saturating_add(message_len);
316 let heap_size = HeapSize::get().into() as usize;
317 let new_pos = pos.saturating_add(data_len);
318 if new_pos <= heap_size {
319 Ok(new_pos)
320 } else {
321 Err(())
322 }
323 }
324
325 fn try_append_message<T: Config>(
327 &mut self,
328 message: BoundedSlice<u8, MaxMessageLenOf<T>>,
329 ) -> Result<(), ()> {
330 let pos = self.heap_pos();
331 Self::can_append_message_at(pos, message.len())?;
332 let payload_len = message.len().saturated_into();
333 let header = ItemHeader::<Size> { payload_len, is_processed: false };
334
335 let mut heap = core::mem::take(&mut self.heap).into_inner();
336 header.using_encoded(|h| heap.extend_from_slice(h));
337 heap.extend_from_slice(message.deref());
338 self.heap = BoundedVec::defensive_truncate_from(heap);
339 self.last = pos.saturated_into();
340 self.remaining.saturating_inc();
341 self.remaining_size.saturating_accrue(payload_len);
342 Ok(())
343 }
344
345 fn peek_first(&self) -> Option<BoundedSlice<'_, u8, IntoU32<HeapSize, Size>>> {
349 if self.first > self.last {
350 return None;
351 }
352 let f = (self.first.into() as usize).min(self.heap.len());
353 let mut item_slice = &self.heap[f..];
354 if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
355 let payload_len = h.payload_len.into() as usize;
356 if payload_len <= item_slice.len() {
357 return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]));
360 }
361 }
362 defensive!("message-queue: heap corruption");
363 None
364 }
365
366 fn skip_first(&mut self, is_processed: bool) {
368 let f = (self.first.into() as usize).min(self.heap.len());
369 if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
370 if is_processed && !h.is_processed {
371 h.is_processed = true;
372 h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
373 self.remaining.saturating_dec();
374 self.remaining_size.saturating_reduce(h.payload_len);
375 }
376 self.first
377 .saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
378 self.first.saturating_accrue(h.payload_len);
379 self.first_index.saturating_inc();
380 }
381 }
382
383 fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
385 let mut pos = 0;
386 let mut item_slice = &self.heap[..];
387 let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
388 for _ in 0..index {
389 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
390 let item_len = h.payload_len.into() as usize;
391 if item_slice.len() < item_len {
392 return None;
393 }
394 item_slice = &item_slice[item_len..];
395 pos.saturating_accrue(header_len.saturating_add(item_len));
396 }
397 let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
398 if item_slice.len() < h.payload_len.into() as usize {
399 return None;
400 }
401 item_slice = &item_slice[..h.payload_len.into() as usize];
402 Some((pos, h.is_processed, item_slice))
403 }
404
405 fn note_processed_at_pos(&mut self, pos: usize) {
410 if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
411 if !h.is_processed {
412 h.is_processed = true;
413 h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
414 self.remaining.saturating_dec();
415 self.remaining_size.saturating_reduce(h.payload_len);
416 }
417 }
418 }
419
420 fn is_complete(&self) -> bool {
422 self.remaining.is_zero()
423 }
424}
425
426#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
428pub struct Neighbours<MessageOrigin> {
429 prev: MessageOrigin,
431 next: MessageOrigin,
433}
434
435#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
441pub struct BookState<MessageOrigin> {
442 begin: PageIndex,
445 end: PageIndex,
447 count: PageIndex,
452 ready_neighbours: Option<Neighbours<MessageOrigin>>,
455 message_count: u64,
457 size: u64,
459}
460
461impl<MessageOrigin> Default for BookState<MessageOrigin> {
462 fn default() -> Self {
463 Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
464 }
465}
466
467impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
468 fn from(book: BookState<MessageOrigin>) -> Self {
469 QueueFootprint {
470 pages: book.count,
471 ready_pages: book.end.defensive_saturating_sub(book.begin),
472 storage: Footprint { count: book.message_count, size: book.size },
473 }
474 }
475}
476
477pub trait OnQueueChanged<Id> {
479 fn on_queue_changed(id: Id, fp: QueueFootprint);
481}
482
483impl<Id> OnQueueChanged<Id> for () {
484 fn on_queue_changed(_: Id, _: QueueFootprint) {}
485}
486
487pub trait ForceSetHead<O> {
489 fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
496}
497
498#[pezframe_support::pezpallet]
499pub mod pezpallet {
500 use super::*;
501
502 #[pezpallet::pezpallet]
503 pub struct Pezpallet<T>(_);
504
505 #[pezpallet::config]
507 pub trait Config: pezframe_system::Config {
508 #[allow(deprecated)]
510 type RuntimeEvent: From<Event<Self>>
511 + IsType<<Self as pezframe_system::Config>::RuntimeEvent>;
512
513 type WeightInfo: WeightInfo;
515
516 type MessageProcessor: ProcessMessage;
527
528 type Size: BaseArithmetic
530 + Unsigned
531 + Copy
532 + Into<u32>
533 + Member
534 + Encode
535 + Decode
536 + MaxEncodedLen
537 + ConstEncodedLen
538 + TypeInfo
539 + Default;
540
541 type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
544
545 type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
551
552 #[pezpallet::constant]
558 type HeapSize: Get<Self::Size>;
559
560 #[pezpallet::constant]
564 type MaxStale: Get<u32>;
565
566 #[pezpallet::constant]
573 type ServiceWeight: Get<Option<Weight>>;
574
575 #[pezpallet::constant]
581 type IdleMaxServiceWeight: Get<Option<Weight>>;
582 }
583
584 #[pezpallet::event]
585 #[pezpallet::generate_deposit(pub(super) fn deposit_event)]
586 pub enum Event<T: Config> {
587 ProcessingFailed {
589 id: H256,
591 origin: MessageOriginOf<T>,
593 error: ProcessMessageError,
598 },
599 Processed {
601 id: H256,
603 origin: MessageOriginOf<T>,
605 weight_used: Weight,
607 success: bool,
614 },
615 OverweightEnqueued {
617 id: [u8; 32],
619 origin: MessageOriginOf<T>,
621 page_index: PageIndex,
623 message_index: T::Size,
625 },
626 PageReaped {
628 origin: MessageOriginOf<T>,
630 index: PageIndex,
632 },
633 }
634
635 #[pezpallet::error]
636 pub enum Error<T> {
637 NotReapable,
640 NoPage,
642 NoMessage,
644 AlreadyProcessed,
646 Queued,
648 InsufficientWeight,
650 TemporarilyUnprocessable,
655 QueuePaused,
659 RecursiveDisallowed,
661 }
662
663 #[pezpallet::storage]
665 pub type BookStateFor<T: Config> =
666 StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
667
668 #[pezpallet::storage]
670 pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
671
672 #[pezpallet::storage]
674 pub type Pages<T: Config> = StorageDoubleMap<
675 _,
676 Twox64Concat,
677 MessageOriginOf<T>,
678 Twox64Concat,
679 PageIndex,
680 Page<T::Size, T::HeapSize>,
681 OptionQuery,
682 >;
683
684 #[pezpallet::hooks]
685 impl<T: Config> Hooks<BlockNumberFor<T>> for Pezpallet<T> {
686 fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
687 if let Some(weight_limit) = T::ServiceWeight::get() {
688 Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
689 } else {
690 Weight::zero()
691 }
692 }
693
694 fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
695 if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
696 Self::service_queues_impl(
698 weight_limit.min(remaining_weight),
699 ServiceQueuesContext::OnIdle,
700 )
701 } else {
702 Weight::zero()
703 }
704 }
705
706 #[cfg(feature = "try-runtime")]
707 fn try_state(_: BlockNumberFor<T>) -> Result<(), pezsp_runtime::TryRuntimeError> {
708 Self::do_try_state()
709 }
710
711 #[cfg(test)]
713 fn integrity_test() {
714 Self::do_integrity_test().expect("Pezpallet config is valid; qed")
715 }
716 }
717
718 #[pezpallet::call]
719 impl<T: Config> Pezpallet<T> {
720 #[pezpallet::call_index(0)]
722 #[pezpallet::weight(T::WeightInfo::reap_page())]
723 pub fn reap_page(
724 origin: OriginFor<T>,
725 message_origin: MessageOriginOf<T>,
726 page_index: PageIndex,
727 ) -> DispatchResult {
728 ensure_signed(origin)?;
729 Self::do_reap_page(&message_origin, page_index)
730 }
731
732 #[pezpallet::call_index(1)]
746 #[pezpallet::weight(
747 T::WeightInfo::execute_overweight_page_updated().max(
748 T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
749 )]
750 pub fn execute_overweight(
751 origin: OriginFor<T>,
752 message_origin: MessageOriginOf<T>,
753 page: PageIndex,
754 index: T::Size,
755 weight_limit: Weight,
756 ) -> DispatchResultWithPostInfo {
757 ensure_signed(origin)?;
758 let actual_weight =
759 Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
760 Ok(Some(actual_weight).into())
761 }
762 }
763}
764
765#[derive(PartialEq, Debug)]
767enum PageExecutionStatus {
768 Bailed,
770 NoProgress,
774 NoMore,
780}
781
782#[derive(PartialEq, Debug)]
784enum ItemExecutionStatus {
785 Bailed,
787 NoProgress,
791 NoItem,
793 Executed(bool),
797}
798
799#[derive(PartialEq)]
801enum MessageExecutionStatus {
802 InsufficientWeight,
804 Overweight,
806 Processed,
808 Unprocessable { permanent: bool },
810 StackLimitReached,
817}
818
819#[derive(PartialEq)]
822enum ServiceQueuesContext {
823 OnIdle,
825 OnInitialize,
827 ServiceQueues,
829}
830
831impl<T: Config> Pezpallet<T> {
832 fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
836 if let Some(head) = ServiceHead::<T>::get() {
837 let mut head_book_state = BookStateFor::<T>::get(&head);
838 let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
839 let tail = head_neighbours.prev;
840 head_neighbours.prev = origin.clone();
841 head_book_state.ready_neighbours = Some(head_neighbours);
842 BookStateFor::<T>::insert(&head, head_book_state);
843
844 let mut tail_book_state = BookStateFor::<T>::get(&tail);
845 let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
846 tail_neighbours.next = origin.clone();
847 tail_book_state.ready_neighbours = Some(tail_neighbours);
848 BookStateFor::<T>::insert(&tail, tail_book_state);
849
850 Ok(Neighbours { next: head, prev: tail })
851 } else {
852 ServiceHead::<T>::put(origin);
853 Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
854 }
855 }
856
857 fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
858 if origin == &neighbours.next {
859 debug_assert!(
860 origin == &neighbours.prev,
861 "unknitting from single item ring; outgoing must be only item"
862 );
863 ServiceHead::<T>::kill();
865 } else {
866 BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
867 if let Some(ref mut n) = book_state.ready_neighbours {
868 n.prev = neighbours.prev.clone()
869 }
870 });
871 BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
872 if let Some(ref mut n) = book_state.ready_neighbours {
873 n.next = neighbours.next.clone()
874 }
875 });
876 if let Some(head) = ServiceHead::<T>::get() {
877 if &head == origin {
878 ServiceHead::<T>::put(neighbours.next);
879 }
880 } else {
881 defensive!("`ServiceHead` must be some if there was a ready queue");
882 }
883 }
884 }
885
886 fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
890 if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
891 return None;
892 }
893
894 if let Some(head) = ServiceHead::<T>::get() {
895 let mut head_book_state = BookStateFor::<T>::get(&head);
896 if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
897 ServiceHead::<T>::put(&head_neighbours.next);
898 Some(head)
899 } else {
900 defensive!("The head must point to a queue in the ready ring");
901 None
902 }
903 } else {
904 None
905 }
906 }
907
908 fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
909 if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
910 return Err(());
911 }
912
913 if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
915 ServiceHead::<T>::put(queue);
916 Ok(true)
917 } else {
918 Ok(false)
919 }
920 }
921
922 fn max_message_weight(limit: Weight) -> Option<Weight> {
928 let service_weight = T::ServiceWeight::get().unwrap_or_default();
929 let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
930
931 let max_message_weight =
934 if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
935
936 if max_message_weight.is_zero() {
937 limit.checked_sub(&Self::single_msg_overhead())
939 } else {
940 max_message_weight.checked_sub(&Self::single_msg_overhead())
941 }
942 }
943
944 fn single_msg_overhead() -> Weight {
946 T::WeightInfo::bump_service_head()
947 .saturating_add(T::WeightInfo::service_queue_base())
948 .saturating_add(
949 T::WeightInfo::service_page_base_completion()
950 .max(T::WeightInfo::service_page_base_no_completion()),
951 )
952 .saturating_add(T::WeightInfo::service_page_item())
953 .saturating_add(T::WeightInfo::ready_ring_unknit())
954 }
955
956 #[cfg(test)]
960 fn do_integrity_test() -> Result<(), String> {
961 ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
962
963 let max_block = T::BlockWeights::get().max_block;
964
965 if let Some(service) = T::ServiceWeight::get() {
966 if Self::max_message_weight(service).is_none() {
967 return Err(format!(
968 "ServiceWeight too low: {}. Must be at least {}",
969 service,
970 Self::single_msg_overhead(),
971 ));
972 }
973
974 if service.any_gt(max_block) {
975 return Err(format!(
976 "ServiceWeight {service} is bigger than max block weight {max_block}"
977 ));
978 }
979 }
980
981 if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
982 if on_idle.any_gt(max_block) {
983 return Err(format!(
984 "IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
985 ));
986 }
987 }
988
989 if let (Some(service_weight), Some(on_idle)) =
990 (T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
991 {
992 if !(service_weight.all_gt(on_idle)
993 || on_idle.all_gt(service_weight)
994 || service_weight == on_idle)
995 {
996 return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into());
997 }
998 }
999
1000 Ok(())
1001 }
1002
1003 fn do_enqueue_messages<'a>(
1004 origin: &MessageOriginOf<T>,
1005 messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
1006 ) {
1007 let mut book_state = BookStateFor::<T>::get(origin);
1008
1009 let mut maybe_page = None;
1010 if book_state.end > book_state.begin {
1012 debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
1013 maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
1014 defensive!("Corruption: referenced page doesn't exist.");
1015 None
1016 });
1017 }
1018
1019 for message in messages {
1020 if let Some(mut page) = maybe_page {
1022 maybe_page = match page.try_append_message::<T>(message) {
1023 Ok(_) => Some(page),
1024 Err(_) => {
1025 Pages::<T>::insert(origin, book_state.end - 1, page);
1028 None
1029 },
1030 }
1031 }
1032 if maybe_page.is_none() {
1034 book_state.end.saturating_inc();
1035 book_state.count.saturating_inc();
1036 maybe_page = Some(Page::from_message::<T>(message));
1037 }
1038
1039 book_state.message_count.saturating_inc();
1041 book_state
1042 .size
1043 .saturating_accrue(message.len() as u64);
1045 }
1046
1047 if let Some(page) = maybe_page {
1049 Pages::<T>::insert(origin, book_state.end - 1, page);
1050 }
1051
1052 if book_state.ready_neighbours.is_none() {
1054 match Self::ready_ring_knit(origin) {
1055 Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
1056 Err(()) => {
1057 defensive!("Ring state invalid when knitting");
1058 },
1059 }
1060 }
1061
1062 BookStateFor::<T>::insert(origin, book_state);
1064 }
1065
1066 pub fn do_execute_overweight(
1071 origin: MessageOriginOf<T>,
1072 page_index: PageIndex,
1073 index: T::Size,
1074 weight_limit: Weight,
1075 ) -> Result<Weight, Error<T>> {
1076 match with_service_mutex(|| {
1077 Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1078 }) {
1079 Err(()) => Err(Error::<T>::RecursiveDisallowed),
1080 Ok(x) => x,
1081 }
1082 }
1083
1084 fn do_execute_overweight_inner(
1086 origin: MessageOriginOf<T>,
1087 page_index: PageIndex,
1088 index: T::Size,
1089 weight_limit: Weight,
1090 ) -> Result<Weight, Error<T>> {
1091 let mut book_state = BookStateFor::<T>::get(&origin);
1092 ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1093
1094 let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1095 let (pos, is_processed, payload) =
1096 page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1097 let payload_len = payload.len() as u64;
1098 ensure!(
1099 page_index < book_state.begin
1100 || (page_index == book_state.begin && pos < page.first.into() as usize),
1101 Error::<T>::Queued
1102 );
1103 ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1104 use MessageExecutionStatus::*;
1105 let mut weight_counter = WeightMeter::with_limit(weight_limit);
1106 match Self::process_message_payload(
1107 origin.clone(),
1108 page_index,
1109 index,
1110 payload,
1111 &mut weight_counter,
1112 Weight::MAX,
1113 ) {
1116 Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1117 StackLimitReached | Unprocessable { permanent: false } => {
1118 Err(Error::<T>::TemporarilyUnprocessable)
1119 },
1120 Unprocessable { permanent: true } | Processed => {
1121 page.note_processed_at_pos(pos);
1122 book_state.message_count.saturating_dec();
1123 book_state.size.saturating_reduce(payload_len);
1124 let page_weight = if page.remaining.is_zero() {
1125 debug_assert!(
1126 page.remaining_size.is_zero(),
1127 "no messages remaining; no space taken; qed"
1128 );
1129 Pages::<T>::remove(&origin, page_index);
1130 debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1131 book_state.count.saturating_dec();
1132 T::WeightInfo::execute_overweight_page_removed()
1133 } else {
1136 Pages::<T>::insert(&origin, page_index, page);
1137 T::WeightInfo::execute_overweight_page_updated()
1138 };
1139 BookStateFor::<T>::insert(&origin, &book_state);
1140 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1141 Ok(weight_counter.consumed().saturating_add(page_weight))
1142 },
1143 }
1144 }
1145
1146 fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1148 match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1149 Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1150 Ok(x) => x,
1151 }
1152 }
1153
1154 fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1156 let mut book_state = BookStateFor::<T>::get(origin);
1157 ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1160
1161 let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1162
1163 let reapable = page.remaining.is_zero();
1165
1166 let cullable = || {
1168 let total_pages = book_state.count;
1169 let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1170
1171 let stale_pages = total_pages - ready_pages;
1179
1180 let max_stale = T::MaxStale::get();
1184
1185 let overflow = match stale_pages.checked_sub(max_stale + 1) {
1188 Some(x) => x + 1,
1189 None => return false,
1190 };
1191
1192 let backlog = (max_stale * max_stale / overflow).max(max_stale);
1201
1202 let watermark = book_state.begin.saturating_sub(backlog);
1203 page_index < watermark
1204 };
1205 ensure!(reapable || cullable(), Error::<T>::NotReapable);
1206
1207 Pages::<T>::remove(origin, page_index);
1208 debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1209 book_state.count.saturating_dec();
1210 book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1211 book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1212 BookStateFor::<T>::insert(origin, &book_state);
1213 T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1214 Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1215
1216 Ok(())
1217 }
1218
1219 fn service_queue(
1223 origin: MessageOriginOf<T>,
1224 weight: &mut WeightMeter,
1225 overweight_limit: Weight,
1226 ) -> (bool, Option<MessageOriginOf<T>>) {
1227 use PageExecutionStatus::*;
1228 if weight
1229 .try_consume(
1230 T::WeightInfo::service_queue_base()
1231 .saturating_add(T::WeightInfo::ready_ring_unknit()),
1232 )
1233 .is_err()
1234 {
1235 return (false, None);
1236 }
1237
1238 let mut book_state = BookStateFor::<T>::get(&origin);
1239 let mut total_processed = 0;
1240 if T::QueuePausedQuery::is_paused(&origin) {
1241 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1242 return (false, next_ready);
1243 }
1244
1245 while book_state.end > book_state.begin {
1246 let (processed, status) =
1247 Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1248 total_processed.saturating_accrue(processed);
1249 match status {
1250 Bailed | NoProgress => break,
1252 NoMore => (),
1254 };
1255 book_state.begin.saturating_inc();
1256 }
1257 let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1258 if book_state.begin >= book_state.end {
1259 if let Some(neighbours) = book_state.ready_neighbours.take() {
1261 Self::ready_ring_unknit(&origin, neighbours);
1262 } else if total_processed > 0 {
1263 defensive!("Freshly processed queue must have been ready");
1264 }
1265 }
1266 BookStateFor::<T>::insert(&origin, &book_state);
1267 if total_processed > 0 {
1268 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1269 }
1270 (total_processed > 0, next_ready)
1271 }
1272
1273 fn service_page(
1277 origin: &MessageOriginOf<T>,
1278 book_state: &mut BookStateOf<T>,
1279 weight: &mut WeightMeter,
1280 overweight_limit: Weight,
1281 ) -> (u32, PageExecutionStatus) {
1282 use PageExecutionStatus::*;
1283 if weight
1284 .try_consume(
1285 T::WeightInfo::service_page_base_completion()
1286 .max(T::WeightInfo::service_page_base_no_completion()),
1287 )
1288 .is_err()
1289 {
1290 return (0, Bailed);
1291 }
1292
1293 let page_index = book_state.begin;
1294 let mut page = match Pages::<T>::get(origin, page_index) {
1295 Some(p) => p,
1296 None => {
1297 defensive!("message-queue: referenced page not found");
1298 return (0, NoMore);
1299 },
1300 };
1301
1302 let mut total_processed = 0;
1303
1304 let status = loop {
1306 use ItemExecutionStatus::*;
1307 match Self::service_page_item(
1308 origin,
1309 page_index,
1310 book_state,
1311 &mut page,
1312 weight,
1313 overweight_limit,
1314 ) {
1315 Bailed => break PageExecutionStatus::Bailed,
1316 NoItem => break PageExecutionStatus::NoMore,
1317 NoProgress => break PageExecutionStatus::NoProgress,
1318 Executed(true) => total_processed.saturating_inc(),
1320 Executed(false) => (),
1321 }
1322 };
1323
1324 if page.is_complete() {
1325 debug_assert!(status != Bailed, "we never bail if a page became complete");
1326 Pages::<T>::remove(origin, page_index);
1327 debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1328 book_state.count.saturating_dec();
1329 } else {
1330 Pages::<T>::insert(origin, page_index, page);
1331 }
1332 (total_processed, status)
1333 }
1334
1335 pub(crate) fn service_page_item(
1337 origin: &MessageOriginOf<T>,
1338 page_index: PageIndex,
1339 book_state: &mut BookStateOf<T>,
1340 page: &mut PageOf<T>,
1341 weight: &mut WeightMeter,
1342 overweight_limit: Weight,
1343 ) -> ItemExecutionStatus {
1344 use MessageExecutionStatus::*;
1345 if page.is_complete() {
1348 return ItemExecutionStatus::NoItem;
1349 }
1350 if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1351 return ItemExecutionStatus::Bailed;
1352 }
1353
1354 let payload = &match page.peek_first() {
1355 Some(m) => m,
1356 None => return ItemExecutionStatus::NoItem,
1357 }[..];
1358 let payload_len = payload.len() as u64;
1359
1360 Pages::<T>::insert(origin, page_index, &*page);
1362 BookStateFor::<T>::insert(origin, &*book_state);
1363
1364 let res = Self::process_message_payload(
1365 origin.clone(),
1366 page_index,
1367 page.first_index,
1368 payload,
1369 weight,
1370 overweight_limit,
1371 );
1372
1373 *book_state = BookStateFor::<T>::get(origin);
1375 if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1376 *page = new_page;
1377 } else {
1378 defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1379 return ItemExecutionStatus::NoItem;
1380 };
1381
1382 let is_processed = match res {
1383 InsufficientWeight => return ItemExecutionStatus::Bailed,
1384 Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1385 Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1386 Overweight => false,
1387 };
1388
1389 if is_processed {
1390 book_state.message_count.saturating_dec();
1391 book_state.size.saturating_reduce(payload_len as u64);
1392 }
1393 page.skip_first(is_processed);
1394 ItemExecutionStatus::Executed(is_processed)
1395 }
1396
1397 #[cfg(any(test, feature = "try-runtime", feature = "std"))]
1417 pub fn do_try_state() -> Result<(), pezsp_runtime::TryRuntimeError> {
1418 ensure!(
1420 BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1421 "Memory Corruption in BookStateFor"
1422 );
1423 ensure!(
1425 Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1426 "Memory Corruption in Pages"
1427 );
1428
1429 for book in BookStateFor::<T>::iter_values() {
1431 ensure!(book.end >= book.begin, "Invariant");
1432 ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1433 ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1434 ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1435 ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1436
1437 let fp: QueueFootprint = book.into();
1438 ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1439 }
1440
1441 let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1443
1444 while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1445 ensure!(
1446 BookStateFor::<T>::contains_key(&head),
1447 "Service head must point to an existing book"
1448 );
1449
1450 let head_book_state = BookStateFor::<T>::get(&head);
1451 ensure!(
1452 head_book_state.message_count > 0,
1453 "There must be some messages if in ReadyRing"
1454 );
1455 ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1456 ensure!(
1457 head_book_state.end > head_book_state.begin,
1458 "End > Begin if unprocessed messages exists"
1459 );
1460 ensure!(
1461 head_book_state.ready_neighbours.is_some(),
1462 "There must be neighbours if in ReadyRing"
1463 );
1464
1465 if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1466 ensure!(
1467 head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1468 "Can only happen if only queue in ReadyRing"
1469 );
1470 }
1471
1472 for page_index in head_book_state.begin..head_book_state.end {
1473 let page = Pages::<T>::get(&head, page_index).unwrap();
1474 let remaining_messages = page.remaining;
1475 let mut counted_remaining_messages: u32 = 0;
1476 ensure!(
1477 remaining_messages > 0.into(),
1478 "These must be some messages that have not been processed yet!"
1479 );
1480
1481 for i in 0..u32::MAX {
1482 if let Some((_, processed, _)) = page.peek_index(i as usize) {
1483 if !processed {
1484 counted_remaining_messages += 1;
1485 }
1486 } else {
1487 break;
1488 }
1489 }
1490
1491 ensure!(
1492 remaining_messages.into() == counted_remaining_messages,
1493 "Memory Corruption"
1494 );
1495 }
1496
1497 if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1498 break;
1499 }
1500 }
1501 Ok(())
1502 }
1503
1504 #[cfg(feature = "std")]
1520 pub fn debug_info() -> String {
1521 let mut info = String::new();
1522 for (origin, book_state) in BookStateFor::<T>::iter() {
1523 let mut queue = format!("queue {:?}:\n", &origin);
1524 let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1525 pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1526 for (page_index, mut page) in pages.into_iter() {
1527 let page_info = if book_state.begin == page_index { ">" } else { " " };
1528 let mut page_info = format!(
1529 "{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1530 page_info, page_index, page.first, page.last, page.remaining
1531 );
1532 for i in 0..u32::MAX {
1533 if let Some((_, processed, message)) =
1534 page.peek_index(i.try_into().expect("std-only code"))
1535 {
1536 let msg = String::from_utf8_lossy(message);
1537 if processed {
1538 page_info.push('*');
1539 }
1540 page_info.push_str(&format!("{:?}, ", msg));
1541 page.skip_first(true);
1542 } else {
1543 break;
1544 }
1545 }
1546 page_info.push_str("]\n");
1547 queue.push_str(&page_info);
1548 }
1549 info.push_str(&queue);
1550 }
1551 info
1552 }
1553
1554 fn process_message_payload(
1562 origin: MessageOriginOf<T>,
1563 page_index: PageIndex,
1564 message_index: T::Size,
1565 message: &[u8],
1566 meter: &mut WeightMeter,
1567 overweight_limit: Weight,
1568 ) -> MessageExecutionStatus {
1569 let mut id = pezsp_io::hashing::blake2_256(message);
1570 use ProcessMessageError::*;
1571 let prev_consumed = meter.consumed();
1572
1573 let transaction =
1574 storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1575 let res =
1576 T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1577 match &res {
1578 Ok(_) => TransactionOutcome::Commit(Ok(res)),
1579 Err(_) => TransactionOutcome::Rollback(Ok(res)),
1580 }
1581 });
1582
1583 let transaction = match transaction {
1584 Ok(result) => result,
1585 _ => {
1586 defensive!(
1587 "Error occurred processing message, storage changes will be rolled back"
1588 );
1589 return MessageExecutionStatus::Unprocessable { permanent: true };
1590 },
1591 };
1592
1593 match transaction {
1594 Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1595 Self::deposit_event(Event::<T>::OverweightEnqueued {
1597 id,
1598 origin,
1599 page_index,
1600 message_index,
1601 });
1602 MessageExecutionStatus::Overweight
1603 },
1604 Err(Overweight(_)) => {
1605 MessageExecutionStatus::InsufficientWeight
1608 },
1609 Err(Yield) => {
1610 MessageExecutionStatus::Unprocessable { permanent: false }
1612 },
1613 Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1614 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1616 MessageExecutionStatus::Unprocessable { permanent: true }
1617 },
1618 Err(error @ StackLimitReached) => {
1619 Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1620 MessageExecutionStatus::StackLimitReached
1621 },
1622 Ok(success) => {
1623 let weight_used = meter.consumed().saturating_sub(prev_consumed);
1625 Self::deposit_event(Event::<T>::Processed {
1626 id: id.into(),
1627 origin,
1628 weight_used,
1629 success,
1630 });
1631 MessageExecutionStatus::Processed
1632 },
1633 }
1634 }
1635
1636 fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1637 let mut weight = WeightMeter::with_limit(weight_limit);
1638
1639 let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1641 if matches!(context, ServiceQueuesContext::OnInitialize) {
1642 defensive!("Not enough weight to service a single message.");
1643 }
1644 Weight::zero()
1645 });
1646
1647 match with_service_mutex(|| {
1648 let mut next = match Self::bump_service_head(&mut weight) {
1649 Some(h) => h,
1650 None => return weight.consumed(),
1651 };
1652 let mut last_no_progress = None;
1656
1657 loop {
1658 let (progressed, n) =
1659 Self::service_queue(next.clone(), &mut weight, overweight_limit);
1660 next = match n {
1661 Some(n) => {
1662 if !progressed {
1663 if last_no_progress == Some(n.clone()) {
1664 break;
1665 }
1666 if last_no_progress.is_none() {
1667 last_no_progress = Some(next.clone())
1668 }
1669 n
1670 } else {
1671 last_no_progress = None;
1672 n
1673 }
1674 },
1675 None => break,
1676 }
1677 }
1678 weight.consumed()
1679 }) {
1680 Err(()) => weight.consumed(),
1681 Ok(w) => w,
1682 }
1683 }
1684}
1685
1686impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pezpallet<T> {
1687 fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
1688 Pezpallet::<T>::set_service_head(weight, origin)
1689 }
1690}
1691
1692pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1694 environmental::environmental!(token: Option<()>);
1696
1697 token::using_once(&mut Some(()), || {
1698 let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1700
1701 defer! {
1703 token::with(|t| {
1704 *t = Some(hold);
1705 });
1706 }
1707
1708 Ok(f())
1709 })
1710}
1711
1712pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1714impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1715 fn get() -> u32 {
1716 T::max_encoded_len() as u32
1717 }
1718}
1719
1720pub struct MaxMessageLen<Origin, Size, HeapSize>(
1722 core::marker::PhantomData<(Origin, Size, HeapSize)>,
1723);
1724impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1725 for MaxMessageLen<Origin, Size, HeapSize>
1726{
1727 fn get() -> u32 {
1728 (HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1729 }
1730}
1731
1732pub type MaxMessageLenOf<T> =
1734 MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1735pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1737pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1739pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1741pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1743pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1745
1746pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1749impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1750 fn get() -> u32 {
1751 T::get().into()
1752 }
1753}
1754
1755impl<T: Config> ServiceQueues for Pezpallet<T> {
1756 type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1757
1758 fn service_queues(weight_limit: Weight) -> Weight {
1759 Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1760 }
1761
1762 fn execute_overweight(
1766 weight_limit: Weight,
1767 (message_origin, page, index): Self::OverweightMessageAddress,
1768 ) -> Result<Weight, ExecuteOverweightError> {
1769 let mut weight = WeightMeter::with_limit(weight_limit);
1770 if weight
1771 .try_consume(
1772 T::WeightInfo::execute_overweight_page_removed()
1773 .max(T::WeightInfo::execute_overweight_page_updated()),
1774 )
1775 .is_err()
1776 {
1777 return Err(ExecuteOverweightError::InsufficientWeight);
1778 }
1779
1780 Pezpallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining())
1781 .map_err(|e| match e {
1782 Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1783 Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1784 Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1785 Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued => {
1786 ExecuteOverweightError::NotFound
1787 },
1788 Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1789 _ => ExecuteOverweightError::Other,
1790 })
1791 }
1792}
1793
1794impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pezpallet<T> {
1795 type MaxMessageLen =
1796 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1797
1798 fn enqueue_message(
1799 message: BoundedSlice<u8, Self::MaxMessageLen>,
1800 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1801 ) {
1802 Self::do_enqueue_messages(&origin, [message].into_iter());
1803 let book_state = BookStateFor::<T>::get(&origin);
1804 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1805 }
1806
1807 fn enqueue_messages<'a>(
1808 messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1809 origin: <T::MessageProcessor as ProcessMessage>::Origin,
1810 ) {
1811 Self::do_enqueue_messages(&origin, messages);
1812 let book_state = BookStateFor::<T>::get(&origin);
1813 T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1814 }
1815
1816 fn sweep_queue(origin: MessageOriginOf<T>) {
1817 if !BookStateFor::<T>::contains_key(&origin) {
1818 return;
1819 }
1820 let mut book_state = BookStateFor::<T>::get(&origin);
1821 book_state.begin = book_state.end;
1822 if let Some(neighbours) = book_state.ready_neighbours.take() {
1823 Self::ready_ring_unknit(&origin, neighbours);
1824 }
1825 BookStateFor::<T>::insert(&origin, &book_state);
1826 }
1827}
1828
1829impl<T: Config> QueueFootprintQuery<MessageOriginOf<T>> for Pezpallet<T> {
1830 type MaxMessageLen =
1831 MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1832
1833 fn get_batches_footprints<'a>(
1834 origin: MessageOriginOf<T>,
1835 msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1836 total_pages_limit: u32,
1837 ) -> BatchesFootprints {
1838 let mut batches_footprints = BatchesFootprints::default();
1839
1840 let mut new_page = false;
1841 let mut total_pages_count = 0;
1842 let mut current_page_pos: usize = T::HeapSize::get().into() as usize;
1843
1844 let book = BookStateFor::<T>::get(&origin);
1845 if book.end > book.begin {
1846 total_pages_count = book.end - book.begin;
1847 if let Some(page) = Pages::<T>::get(origin, book.end - 1) {
1848 current_page_pos = page.heap_pos();
1849 batches_footprints.first_page_pos = current_page_pos;
1850 }
1851 }
1852
1853 let mut msgs = msgs.peekable();
1854 while let Some(msg) = msgs.peek() {
1855 if total_pages_count > total_pages_limit {
1856 return batches_footprints;
1857 }
1858
1859 match Page::<T::Size, T::HeapSize>::can_append_message_at(current_page_pos, msg.len()) {
1860 Ok(new_pos) => {
1861 current_page_pos = new_pos;
1862 batches_footprints.push(msg, new_page);
1863 new_page = false;
1864 msgs.next();
1865 },
1866 Err(_) => {
1867 new_page = true;
1870 total_pages_count += 1;
1871 current_page_pos = 0;
1872 },
1873 }
1874 }
1875
1876 batches_footprints
1877 }
1878
1879 fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1880 BookStateFor::<T>::get(&origin).into()
1881 }
1882}