Skip to main content

pallet_message_queue/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Generalized Message Queue Pallet
19//!
20//! Provides generalized message queuing and processing capabilities on a per-queue basis for
21//! arbitrary use-cases.
22//!
23//! # Design Goals
24//!
25//! 1. Minimal assumptions about `Message`s and `MessageOrigin`s. Both should be MEL bounded blobs.
26//!  This ensures the generality and reusability of the pallet.
27//! 2. Well known and tightly limited pre-dispatch PoV weights, especially for message execution.
28//!  This is paramount for the success of the pallet since message execution is done in
29//!  `on_initialize` which must _never_ under-estimate its PoV weight. It also needs a frugal PoV
30//!  footprint since PoV is scarce and this is (possibly) done in every block. This must also hold
31//! in  the presence of unpredictable message size distributions.
32//! 3. Usable as XCMP, DMP and UMP message/dispatch queue - possibly through adapter types.
33//!
34//! # Design
35//!
36//! The pallet has means to enqueue, store and process messages. This is implemented by having
37//! *queues* which store enqueued messages and can be *served* to process said messages. A queue is
38//! identified by its origin in the `BookStateFor`. Each message has an origin which defines into
39//! which queue it will be stored. Messages are stored by being appended to the last [`Page`] of a
40//! book. Each book keeps track of its pages by indexing `Pages`. The `ReadyRing` contains all
41//! queues which hold at least one unprocessed message and are thereby *ready* to be serviced. The
42//! `ServiceHead` indicates which *ready* queue is the next to be serviced.
43//! The pallet implements [`frame_support::traits::EnqueueMessage`],
44//! [`frame_support::traits::ServiceQueues`] and has [`frame_support::traits::ProcessMessage`] and
45//! [`OnQueueChanged`] hooks to communicate with the outside world.
46//!
47//! NOTE: The storage items are not linked since they are not public.
48//!
49//! **Message Execution**
50//!
51//! Executing a message is offloaded to the [`Config::MessageProcessor`] which contains the actual
52//! logic of how to handle the message since they are blobs. Storage changes are not rolled back on
53//! error.
54//!
55//! A failed message can be temporarily or permanently overweight. The pallet will perpetually try
56//! to execute a temporarily overweight message. A permanently overweight message is skipped and
57//! must be executed manually.
58//!
59//! **Reentrancy**
60//!
61//! This pallet has two entry points for executing (possibly recursive) logic;
62//! [`Pallet::service_queues`] and [`Pallet::execute_overweight`]. Both entry points are guarded by
63//! the same mutex to error on reentrancy. The only functions that are explicitly **allowed** to be
64//! called by a message processor are: [`Pallet::enqueue_message`] and
65//! [`Pallet::enqueue_messages`]. All other functions are forbidden and error with
66//! [`Error::RecursiveDisallowed`].
67//!
68//! **Pagination**
69//!
70//! Queues are stored in a *paged* manner by splitting their messages into [`Page`]s. This results
71//! in a lot of complexity when implementing the pallet but is completely necessary to achieve the
72//! second #[Design Goal](design-goals). The problem comes from the fact a message can *possibly* be
73//! quite large, lets say 64KiB. This then results in a *MEL* of at least 64KiB which results in a
74//! PoV of at least 64KiB. Now we have the assumption that most messages are much shorter than their
75//! maximum allowed length. This would result in most messages having a pre-dispatch PoV size which
76//! is much larger than their post-dispatch PoV size, possibly by a factor of thousand. Disregarding
77//! this observation would cripple the processing power of the pallet since it cannot straighten out
78//! this discrepancy at runtime. Conceptually, the implementation is packing as many messages into a
79//! single bounded vec, as actually fit into the bounds. This reduces the wasted PoV.
80//!
81//! **Page Data Layout**
82//!
83//! A Page contains a heap which holds all its messages. The heap is built by concatenating
84//! `(ItemHeader, Message)` pairs. The [`ItemHeader`] contains the length of the message which is
85//! needed for retrieving it. This layout allows for constant access time of the next message and
86//! linear access time for any message in the page. The header must remain minimal to reduce its PoV
87//! impact.
88//!
89//! **Weight Metering**
90//!
91//! The pallet utilizes the [`sp_weights::WeightMeter`] to manually track its consumption to always
92//! stay within the required limit. This implies that the message processor hook can calculate the
93//! weight of a message without executing it. This restricts the possible use-cases but is necessary
94//! since the pallet runs in `on_initialize` which has a hard weight limit. The weight meter is used
95//! in a way that `can_accrue` and `check_accrue` are always used to check the remaining weight of
96//! an operation before committing to it. The process of exiting due to insufficient weight is
97//! termed "bailing".
98//!
99//! # Scenario: Message enqueuing
100//!
101//! A message `m` is enqueued for origin `o` into queue `Q[o]` through
102//! [`frame_support::traits::EnqueueMessage::enqueue_message`]`(m, o)`.
103//!
104//! First the queue is either loaded if it exists or otherwise created with empty default values.
105//! The message is then inserted to the queue by appended it into its last `Page` or by creating a
106//! new `Page` just for `m` if it does not fit in there. The number of messages in the `Book` is
107//! incremented.
108//!
109//! `Q[o]` is now *ready* which will eventually result in `m` being processed.
110//!
111//! # Scenario: Message processing
112//!
113//! The pallet runs each block in `on_initialize` or when being manually called through
114//! [`frame_support::traits::ServiceQueues::service_queues`].
115//!
116//! First it tries to "rotate" the `ReadyRing` by one through advancing the `ServiceHead` to the
117//! next *ready* queue. It then starts to service this queue by servicing as many pages of it as
118//! possible. Servicing a page means to execute as many message of it as possible. Each executed
119//! message is marked as *processed* if the [`Config::MessageProcessor`] return Ok. An event
120//! [`Event::Processed`] is emitted afterwards. It is possible that the weight limit of the pallet
121//! will never allow a specific message to be executed. In this case it remains as unprocessed and
122//! is skipped. This process stops if either there are no more messages in the queue or the
123//! remaining weight became insufficient to service this queue. If there is enough weight it tries
124//! to advance to the next *ready* queue and service it. This continues until there are no more
125//! queues on which it can make progress or not enough weight to check that.
126//!
127//! # Scenario: Overweight execution
128//!
129//! A permanently over-weight message which was skipped by the message processing will never be
130//! executed automatically through `on_initialize` nor by calling
131//! [`frame_support::traits::ServiceQueues::service_queues`].
132//!
133//! Manual intervention in the form of
134//! [`frame_support::traits::ServiceQueues::execute_overweight`] is necessary. Overweight messages
135//! emit an [`Event::OverweightEnqueued`] event which can be used to extract the arguments for
136//! manual execution. This only works on permanently overweight messages. There is no guarantee that
137//! this will work since the message could be part of a stale page and be reaped before execution
138//! commences.
139//!
140//! # Terminology
141//!
142//! - `Message`: A blob of data into which the pallet has no introspection, defined as
143//! [`BoundedSlice<u8, MaxMessageLenOf<T>>`]. The message length is limited by [`MaxMessageLenOf`]
144//! which is calculated from [`Config::HeapSize`] and [`ItemHeader::max_encoded_len()`].
145//! - `MessageOrigin`: A generic *origin* of a message, defined as [`MessageOriginOf`]. The
146//! requirements for it are kept minimal to remain as generic as possible. The type is defined in
147//! [`frame_support::traits::ProcessMessage::Origin`].
148//! - `Page`: An array of `Message`s, see [`Page`]. Can never be empty.
149//! - `Book`: A list of `Page`s, see [`BookState`]. Can be empty.
150//! - `Queue`: A `Book` together with an `MessageOrigin` which can be part of the `ReadyRing`. Can
151//!   be empty.
152//! - `ReadyRing`: A double-linked list which contains all *ready* `Queue`s. It chains together the
153//!   queues via their `ready_neighbours` fields. A `Queue` is *ready* if it contains at least one
154//!   `Message` which can be processed. Can be empty.
155//! - `ServiceHead`: A pointer into the `ReadyRing` to the next `Queue` to be serviced.
156//! - (`un`)`processed`: A message is marked as *processed* after it was executed by the pallet. A
157//!   message which was either: not yet executed or could not be executed remains as `unprocessed`
158//!   which is the default state for a message after being enqueued.
159//! - `knitting`/`unknitting`: The means of adding or removing a `Queue` from the `ReadyRing`.
160//! - `MEL`: The Max Encoded Length of a type, see [`codec::MaxEncodedLen`].
161//! - `Reentrance`: To enter an execution context again before it has completed.
162//!
163//! # Properties
164//!
165//! **Liveness - Enqueueing**
166//!
167//! It is always possible to enqueue any message for any `MessageOrigin`.
168//!
169//! **Liveness - Processing**
170//!
171//! `on_initialize` always respects its finite weight-limit.
172//!
173//! **Progress - Enqueueing**
174//!
175//! An enqueued message immediately becomes *unprocessed* and thereby eligible for execution.
176//!
177//! **Progress - Processing**
178//!
179//! The pallet will execute at least one unprocessed message per block, if there is any. Ensuring
180//! this property needs careful consideration of the concrete weights, since it is possible that the
181//! weight limit of `on_initialize` never allows for the execution of even one message; trivially if
182//! the limit is set to zero. `integrity_test` can be used to ensure that this property holds.
183//!
184//! **Fairness - Enqueuing**
185//!
186//! Enqueueing a message for a specific `MessageOrigin` does not influence the ability to enqueue a
187//! message for the same of any other `MessageOrigin`; guaranteed by **Liveness - Enqueueing**.
188//!
189//! **Fairness - Processing**
190//!
191//! The average amount of weight available for message processing is the same for each queue if the
192//! number of queues is constant. Creating a new queue must therefore be, possibly economically,
193//! expensive. Currently this is archived by having one queue per para-chain/thread, which keeps the
194//! number of queues within `O(n)` and should be "good enough".
195
196#![deny(missing_docs)]
197#![cfg_attr(not(feature = "std"), no_std)]
198
199mod benchmarking;
200mod integration_test;
201mod mock;
202pub mod mock_helpers;
203mod tests;
204pub mod weights;
205
206extern crate alloc;
207
208use alloc::vec::Vec;
209use codec::{Codec, ConstEncodedLen, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
210use core::{fmt::Debug, ops::Deref};
211use frame_support::{
212	defensive,
213	pallet_prelude::*,
214	traits::{
215		BatchesFootprints, Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
216		ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
217		QueueFootprintQuery, QueuePausedQuery, ServiceQueues,
218	},
219	BoundedSlice, CloneNoBound, DefaultNoBound,
220};
221use frame_system::pallet_prelude::*;
222pub use pallet::*;
223use scale_info::TypeInfo;
224use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use sp_core::{defer, H256};
226use sp_runtime::{
227	traits::{One, Zero},
228	SaturatedConversion, Saturating, TransactionOutcome,
229};
230use sp_weights::WeightMeter;
231pub use weights::WeightInfo;
232
233/// Type for identifying a page.
234type PageIndex = u32;
235
236/// Data encoded and prefixed to the encoded `MessageItem`.
237#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
238pub struct ItemHeader<Size> {
239	/// The length of this item, not including the size of this header. The next item of the page
240	/// follows immediately after the payload of this item.
241	payload_len: Size,
242	/// Whether this item has been processed.
243	is_processed: bool,
244}
245
246impl<Size: ConstEncodedLen> ConstEncodedLen for ItemHeader<Size> {} // marker
247
248/// A page of messages. Pages always contain at least one item.
249#[derive(CloneNoBound, Encode, Decode, DebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen)]
250#[scale_info(skip_type_params(HeapSize))]
251#[codec(mel_bound(Size: MaxEncodedLen))]
252pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
253	/// Messages remaining to be processed; this includes overweight messages which have been
254	/// skipped.
255	remaining: Size,
256	/// The size of all remaining messages to be processed.
257	///
258	/// Includes overweight messages outside of the `first` to `last` window.
259	remaining_size: Size,
260	/// The number of items before the `first` item in this page.
261	first_index: Size,
262	/// The heap-offset of the header of the first message item in this page which is ready for
263	/// processing.
264	first: Size,
265	/// The heap-offset of the header of the last message item in this page.
266	last: Size,
267	/// The heap.
268	heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
269}
270
271impl<
272		Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
273		HeapSize: Get<Size>,
274	> Page<Size, HeapSize>
275where
276	ItemHeader<Size>: ConstEncodedLen,
277{
278	/// Create a [`Page`] from one unprocessed message.
279	fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
280		let payload_len = message.len();
281		let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
282		let payload_len = payload_len.saturated_into();
283		let header = ItemHeader::<Size> { payload_len, is_processed: false };
284
285		let mut heap = Vec::with_capacity(data_len);
286		header.using_encoded(|h| heap.extend_from_slice(h));
287		heap.extend_from_slice(message.deref());
288
289		Page {
290			remaining: One::one(),
291			remaining_size: payload_len,
292			first_index: Zero::zero(),
293			first: Zero::zero(),
294			last: Zero::zero(),
295			heap: BoundedVec::defensive_truncate_from(heap),
296		}
297	}
298
299	/// The heap position where a new message can be appended to the current page.
300	fn heap_pos(&self) -> usize {
301		// The heap is actually a `Vec`, so the place where we can append data
302		// is the end of the `Vec`.
303		self.heap.len()
304	}
305
306	/// Check if a message can be appended to the current page at the provided heap position.
307	///
308	/// On success, returns the resulting position in the page where a new payload can be
309	/// appended.
310	fn can_append_message_at(pos: usize, message_len: usize) -> Result<usize, ()> {
311		let header_size = ItemHeader::<Size>::max_encoded_len();
312		let data_len = header_size.saturating_add(message_len);
313		let heap_size = HeapSize::get().into() as usize;
314		let new_pos = pos.saturating_add(data_len);
315		if new_pos <= heap_size {
316			Ok(new_pos)
317		} else {
318			Err(())
319		}
320	}
321
322	/// Try to append one message to a page.
323	fn try_append_message<T: Config>(
324		&mut self,
325		message: BoundedSlice<u8, MaxMessageLenOf<T>>,
326	) -> Result<(), ()> {
327		let pos = self.heap_pos();
328		Self::can_append_message_at(pos, message.len())?;
329		let payload_len = message.len().saturated_into();
330		let header = ItemHeader::<Size> { payload_len, is_processed: false };
331
332		let mut heap = core::mem::take(&mut self.heap).into_inner();
333		header.using_encoded(|h| heap.extend_from_slice(h));
334		heap.extend_from_slice(message.deref());
335		self.heap = BoundedVec::defensive_truncate_from(heap);
336		self.last = pos.saturated_into();
337		self.remaining.saturating_inc();
338		self.remaining_size.saturating_accrue(payload_len);
339		Ok(())
340	}
341
342	/// Returns the first message in the page without removing it.
343	///
344	/// SAFETY: Does not panic even on corrupted storage.
345	fn peek_first(&self) -> Option<BoundedSlice<'_, u8, IntoU32<HeapSize, Size>>> {
346		if self.first > self.last {
347			return None;
348		}
349		let f = (self.first.into() as usize).min(self.heap.len());
350		let mut item_slice = &self.heap[f..];
351		if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
352			let payload_len = h.payload_len.into() as usize;
353			if payload_len <= item_slice.len() {
354				// impossible to truncate since is sliced up from `self.heap: BoundedVec<u8,
355				// HeapSize>`
356				return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]));
357			}
358		}
359		defensive!("message-queue: heap corruption");
360		None
361	}
362
363	/// Point `first` at the next message, marking the first as processed if `is_processed` is true.
364	fn skip_first(&mut self, is_processed: bool) {
365		let f = (self.first.into() as usize).min(self.heap.len());
366		if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
367			if is_processed && !h.is_processed {
368				h.is_processed = true;
369				h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
370				self.remaining.saturating_dec();
371				self.remaining_size.saturating_reduce(h.payload_len);
372			}
373			self.first
374				.saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
375			self.first.saturating_accrue(h.payload_len);
376			self.first_index.saturating_inc();
377		}
378	}
379
380	/// Return the message with index `index` in the form of `(position, processed, message)`.
381	fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
382		let mut pos = 0;
383		let mut item_slice = &self.heap[..];
384		let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
385		for _ in 0..index {
386			let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
387			let item_len = h.payload_len.into() as usize;
388			if item_slice.len() < item_len {
389				return None;
390			}
391			item_slice = &item_slice[item_len..];
392			pos.saturating_accrue(header_len.saturating_add(item_len));
393		}
394		let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
395		if item_slice.len() < h.payload_len.into() as usize {
396			return None;
397		}
398		item_slice = &item_slice[..h.payload_len.into() as usize];
399		Some((pos, h.is_processed, item_slice))
400	}
401
402	/// Set the `is_processed` flag for the item at `pos` to be `true` if not already and decrement
403	/// the `remaining` counter of the page.
404	///
405	/// Does nothing if no [`ItemHeader`] could be decoded at the given position.
406	fn note_processed_at_pos(&mut self, pos: usize) {
407		if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
408			if !h.is_processed {
409				h.is_processed = true;
410				h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
411				self.remaining.saturating_dec();
412				self.remaining_size.saturating_reduce(h.payload_len);
413			}
414		}
415	}
416
417	/// Returns whether the page is *complete* which means that no messages remain.
418	fn is_complete(&self) -> bool {
419		self.remaining.is_zero()
420	}
421}
422
423/// A single link in the double-linked Ready Ring list.
424#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, Debug, PartialEq)]
425pub struct Neighbours<MessageOrigin> {
426	/// The previous queue.
427	prev: MessageOrigin,
428	/// The next queue.
429	next: MessageOrigin,
430}
431
432/// The state of a queue as represented by a book of its pages.
433///
434/// Each queue has exactly one book which holds all of its pages. All pages of a book combined
435/// contain all of the messages of its queue; hence the name *Book*.
436/// Books can be chained together in a double-linked fashion through their `ready_neighbours` field.
437#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, Debug)]
438pub struct BookState<MessageOrigin> {
439	/// The first page with some items to be processed in it. If this is `>= end`, then there are
440	/// no pages with items to be processing in them.
441	begin: PageIndex,
442	/// One more than the last page with some items to be processed in it.
443	end: PageIndex,
444	/// The number of pages stored at present.
445	///
446	/// This might be larger than `end-begin`, because we keep pages with unprocessed overweight
447	/// messages outside of the end/begin window.
448	count: PageIndex,
449	/// If this book has any ready pages, then this will be `Some` with the previous and next
450	/// neighbours. This wraps around.
451	ready_neighbours: Option<Neighbours<MessageOrigin>>,
452	/// The number of unprocessed messages stored at present.
453	message_count: u64,
454	/// The total size of all unprocessed messages stored at present.
455	size: u64,
456}
457
458impl<MessageOrigin> Default for BookState<MessageOrigin> {
459	fn default() -> Self {
460		Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
461	}
462}
463
464impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
465	fn from(book: BookState<MessageOrigin>) -> Self {
466		QueueFootprint {
467			pages: book.count,
468			ready_pages: book.end.defensive_saturating_sub(book.begin),
469			storage: Footprint { count: book.message_count, size: book.size },
470		}
471	}
472}
473
474/// Handler code for when the items in a queue change.
475pub trait OnQueueChanged<Id> {
476	/// Note that the queue `id` now has `item_count` items in it, taking up `items_size` bytes.
477	fn on_queue_changed(id: Id, fp: QueueFootprint);
478}
479
480impl<Id> OnQueueChanged<Id> for () {
481	fn on_queue_changed(_: Id, _: QueueFootprint) {}
482}
483
484/// Allows to force the processing head to a specific queue.
485pub trait ForceSetHead<O> {
486	/// Set the `ServiceHead` to `origin`.
487	///
488	/// This function:
489	/// - `Err`: Queue did not exist, not enough weight or other error.
490	/// - `Ok(true)`: The service head was updated.
491	/// - `Ok(false)`: The service head was not updated since the queue is empty.
492	fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
493}
494
495#[frame_support::pallet]
496pub mod pallet {
497	use super::*;
498
499	#[pallet::pallet]
500	pub struct Pallet<T>(_);
501
502	/// The module configuration trait.
503	#[pallet::config]
504	pub trait Config: frame_system::Config {
505		/// The overarching event type.
506		#[allow(deprecated)]
507		type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
508
509		/// Weight information for extrinsics in this pallet.
510		type WeightInfo: WeightInfo;
511
512		/// Processor for a message.
513		///
514		/// Storage changes are not rolled back on error.
515		///
516		/// # Benchmarking
517		///
518		/// Must be set to [`mock_helpers::NoopMessageProcessor`] for benchmarking.
519		/// Other message processors that consumes exactly (1, 1) weight for any give message will
520		/// work as well. Otherwise the benchmarking will also measure the weight of the message
521		/// processor, which is not desired.
522		type MessageProcessor: ProcessMessage;
523
524		/// Page/heap size type.
525		type Size: BaseArithmetic
526			+ Unsigned
527			+ Copy
528			+ Into<u32>
529			+ Member
530			+ Encode
531			+ Decode
532			+ MaxEncodedLen
533			+ ConstEncodedLen
534			+ TypeInfo
535			+ Default;
536
537		/// Code to be called when a message queue changes - either with items introduced or
538		/// removed.
539		type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
540
541		/// Queried by the pallet to check whether a queue can be serviced.
542		///
543		/// This also applies to manual servicing via `execute_overweight` and `service_queues`. The
544		/// value of this is only polled once before servicing the queue. This means that changes to
545		/// it that happen *within* the servicing will not be reflected.
546		type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
547
548		/// The size of the page; this implies the maximum message size which can be sent.
549		///
550		/// A good value depends on the expected message sizes, their weights, the weight that is
551		/// available for processing them and the maximal needed message size. The maximal message
552		/// size is slightly lower than this as defined by [`MaxMessageLenOf`].
553		#[pallet::constant]
554		type HeapSize: Get<Self::Size>;
555
556		/// The maximum number of stale pages (i.e. of overweight messages) allowed before culling
557		/// can happen. Once there are more stale pages than this, then historical pages may be
558		/// dropped, even if they contain unprocessed overweight messages.
559		#[pallet::constant]
560		type MaxStale: Get<u32>;
561
562		/// The amount of weight (if any) which should be provided to the message queue for
563		/// servicing enqueued items `on_initialize`.
564		///
565		/// This may be legitimately `None` in the case that you will call
566		/// `ServiceQueues::service_queues` manually or set [`Self::IdleMaxServiceWeight`] to have
567		/// it run in `on_idle`.
568		#[pallet::constant]
569		type ServiceWeight: Get<Option<Weight>>;
570
571		/// The maximum amount of weight (if any) to be used from remaining weight `on_idle` which
572		/// should be provided to the message queue for servicing enqueued items `on_idle`.
573		/// Useful for parachains to process messages at the same block they are received.
574		///
575		/// If `None`, it will not call `ServiceQueues::service_queues` in `on_idle`.
576		#[pallet::constant]
577		type IdleMaxServiceWeight: Get<Option<Weight>>;
578	}
579
580	#[pallet::event]
581	#[pallet::generate_deposit(pub(super) fn deposit_event)]
582	pub enum Event<T: Config> {
583		/// Message discarded due to an error in the `MessageProcessor` (usually a format error).
584		ProcessingFailed {
585			/// The `blake2_256` hash of the message.
586			id: H256,
587			/// The queue of the message.
588			origin: MessageOriginOf<T>,
589			/// The error that occurred.
590			///
591			/// This error is pretty opaque. More fine-grained errors need to be emitted as events
592			/// by the `MessageProcessor`.
593			error: ProcessMessageError,
594		},
595		/// Message is processed.
596		Processed {
597			/// The `blake2_256` hash of the message.
598			id: H256,
599			/// The queue of the message.
600			origin: MessageOriginOf<T>,
601			/// How much weight was used to process the message.
602			weight_used: Weight,
603			/// Whether the message was processed.
604			///
605			/// Note that this does not mean that the underlying `MessageProcessor` was internally
606			/// successful. It *solely* means that the MQ pallet will treat this as a success
607			/// condition and discard the message. Any internal error needs to be emitted as events
608			/// by the `MessageProcessor`.
609			success: bool,
610		},
611		/// Message placed in overweight queue.
612		OverweightEnqueued {
613			/// The `blake2_256` hash of the message.
614			id: [u8; 32],
615			/// The queue of the message.
616			origin: MessageOriginOf<T>,
617			/// The page of the message.
618			page_index: PageIndex,
619			/// The index of the message within the page.
620			message_index: T::Size,
621		},
622		/// This page was reaped.
623		PageReaped {
624			/// The queue of the page.
625			origin: MessageOriginOf<T>,
626			/// The index of the page.
627			index: PageIndex,
628		},
629	}
630
631	#[pallet::error]
632	pub enum Error<T> {
633		/// Page is not reapable because it has items remaining to be processed and is not old
634		/// enough.
635		NotReapable,
636		/// Page to be reaped does not exist.
637		NoPage,
638		/// The referenced message could not be found.
639		NoMessage,
640		/// The message was already processed and cannot be processed again.
641		AlreadyProcessed,
642		/// The message is queued for future execution.
643		Queued,
644		/// There is temporarily not enough weight to continue servicing messages.
645		InsufficientWeight,
646		/// This message is temporarily unprocessable.
647		///
648		/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
649		/// retrying.
650		TemporarilyUnprocessable,
651		/// The queue is paused and no message can be executed from it.
652		///
653		/// This can change at any time and may resolve in the future by re-trying.
654		QueuePaused,
655		/// Another call is in progress and needs to finish before this call can happen.
656		RecursiveDisallowed,
657	}
658
659	/// The index of the first and last (non-empty) pages.
660	#[pallet::storage]
661	pub type BookStateFor<T: Config> =
662		StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
663
664	/// The origin at which we should begin servicing.
665	#[pallet::storage]
666	pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
667
668	/// The map of page indices to pages.
669	#[pallet::storage]
670	pub type Pages<T: Config> = StorageDoubleMap<
671		_,
672		Twox64Concat,
673		MessageOriginOf<T>,
674		Twox64Concat,
675		PageIndex,
676		Page<T::Size, T::HeapSize>,
677		OptionQuery,
678	>;
679
680	#[pallet::hooks]
681	impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
682		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
683			if let Some(weight_limit) = T::ServiceWeight::get() {
684				Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
685			} else {
686				Weight::zero()
687			}
688		}
689
690		fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
691			if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
692				// Make use of the remaining weight to process enqueued messages.
693				Self::service_queues_impl(
694					weight_limit.min(remaining_weight),
695					ServiceQueuesContext::OnIdle,
696				)
697			} else {
698				Weight::zero()
699			}
700		}
701
702		#[cfg(feature = "try-runtime")]
703		fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
704			Self::do_try_state()
705		}
706
707		/// Check all compile-time assumptions about [`crate::Config`].
708		#[cfg(test)]
709		fn integrity_test() {
710			Self::do_integrity_test().expect("Pallet config is valid; qed")
711		}
712	}
713
714	#[pallet::call]
715	impl<T: Config> Pallet<T> {
716		/// Remove a page which has no more messages remaining to be processed or is stale.
717		#[pallet::call_index(0)]
718		#[pallet::weight(T::WeightInfo::reap_page())]
719		pub fn reap_page(
720			origin: OriginFor<T>,
721			message_origin: MessageOriginOf<T>,
722			page_index: PageIndex,
723		) -> DispatchResult {
724			ensure_signed(origin)?;
725			Self::do_reap_page(&message_origin, page_index)
726		}
727
728		/// Execute an overweight message.
729		///
730		/// Temporary processing errors will be propagated whereas permanent errors are treated
731		/// as success condition.
732		///
733		/// - `origin`: Must be `Signed`.
734		/// - `message_origin`: The origin from which the message to be executed arrived.
735		/// - `page`: The page in the queue in which the message to be executed is sitting.
736		/// - `index`: The index into the queue of the message to be executed.
737		/// - `weight_limit`: The maximum amount of weight allowed to be consumed in the execution
738		///   of the message.
739		///
740		/// Benchmark complexity considerations: O(index + weight_limit).
741		#[pallet::call_index(1)]
742		#[pallet::weight(
743			T::WeightInfo::execute_overweight_page_updated().max(
744			T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
745		)]
746		pub fn execute_overweight(
747			origin: OriginFor<T>,
748			message_origin: MessageOriginOf<T>,
749			page: PageIndex,
750			index: T::Size,
751			weight_limit: Weight,
752		) -> DispatchResultWithPostInfo {
753			ensure_signed(origin)?;
754			let actual_weight =
755				Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
756			Ok(Some(actual_weight).into())
757		}
758	}
759}
760
761/// The status of a page after trying to execute its next message.
762#[derive(PartialEq, Debug)]
763enum PageExecutionStatus {
764	/// The execution bailed because there was not enough weight remaining.
765	Bailed,
766	/// The page did not make any progress on its execution.
767	///
768	/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
769	NoProgress,
770	/// No more messages could be loaded. This does _not_ imply `page.is_complete()`.
771	///
772	/// The reasons for this status are:
773	///  - The end of the page is reached but there could still be skipped messages.
774	///  - The storage is corrupted.
775	NoMore,
776}
777
778/// The status after trying to execute the next item of a [`Page`].
779#[derive(PartialEq, Debug)]
780enum ItemExecutionStatus {
781	/// The execution bailed because there was not enough weight remaining.
782	Bailed,
783	/// The item did not make any progress on its execution.
784	///
785	/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
786	NoProgress,
787	/// The item was not found.
788	NoItem,
789	/// Whether the execution of an item resulted in it being processed.
790	///
791	/// One reason for `false` would be permanently overweight.
792	Executed(bool),
793}
794
795/// The status of an attempt to process a message.
796#[derive(PartialEq)]
797enum MessageExecutionStatus {
798	/// There is not enough weight remaining at present.
799	InsufficientWeight,
800	/// There will never be enough weight.
801	Overweight,
802	/// The message was processed successfully.
803	Processed,
804	/// The message was processed and resulted in a, possibly permanent, error.
805	Unprocessable { permanent: bool },
806	/// The stack depth limit was reached.
807	///
808	/// We cannot just return `Unprocessable` in this case, because the processability of the
809	/// message depends on how the function was called. This may be a permanent error if it was
810	/// called by a top-level function, or a transient error if it was already called in a nested
811	/// function.
812	StackLimitReached,
813}
814
815/// The context to pass to [`Pallet::service_queues_impl`] through on_idle and on_initialize hooks
816/// We don't want to throw the defensive message if called from on_idle hook
817#[derive(PartialEq)]
818enum ServiceQueuesContext {
819	/// Context of on_idle hook.
820	OnIdle,
821	/// Context of on_initialize hook.
822	OnInitialize,
823	/// Context `service_queues` trait function.
824	ServiceQueues,
825}
826
827impl<T: Config> Pallet<T> {
828	/// Knit `origin` into the ready ring right at the end.
829	///
830	/// Return the two ready ring neighbours of `origin`.
831	fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
832		if let Some(head) = ServiceHead::<T>::get() {
833			let mut head_book_state = BookStateFor::<T>::get(&head);
834			let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
835			let tail = head_neighbours.prev;
836			head_neighbours.prev = origin.clone();
837			head_book_state.ready_neighbours = Some(head_neighbours);
838			BookStateFor::<T>::insert(&head, head_book_state);
839
840			let mut tail_book_state = BookStateFor::<T>::get(&tail);
841			let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
842			tail_neighbours.next = origin.clone();
843			tail_book_state.ready_neighbours = Some(tail_neighbours);
844			BookStateFor::<T>::insert(&tail, tail_book_state);
845
846			Ok(Neighbours { next: head, prev: tail })
847		} else {
848			ServiceHead::<T>::put(origin);
849			Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
850		}
851	}
852
853	fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
854		if origin == &neighbours.next {
855			debug_assert!(
856				origin == &neighbours.prev,
857				"unknitting from single item ring; outgoing must be only item"
858			);
859			// Service queue empty.
860			ServiceHead::<T>::kill();
861		} else {
862			BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
863				if let Some(ref mut n) = book_state.ready_neighbours {
864					n.prev = neighbours.prev.clone()
865				}
866			});
867			BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
868				if let Some(ref mut n) = book_state.ready_neighbours {
869					n.next = neighbours.next.clone()
870				}
871			});
872			if let Some(head) = ServiceHead::<T>::get() {
873				if &head == origin {
874					ServiceHead::<T>::put(neighbours.next);
875				}
876			} else {
877				defensive!("`ServiceHead` must be some if there was a ready queue");
878			}
879		}
880	}
881
882	/// Tries to bump the current `ServiceHead` to the next ready queue.
883	///
884	/// Returns the current head if it got be bumped and `None` otherwise.
885	fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
886		if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
887			return None;
888		}
889
890		if let Some(head) = ServiceHead::<T>::get() {
891			let mut head_book_state = BookStateFor::<T>::get(&head);
892			if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
893				ServiceHead::<T>::put(&head_neighbours.next);
894				Some(head)
895			} else {
896				defensive!("The head must point to a queue in the ready ring");
897				None
898			}
899		} else {
900			None
901		}
902	}
903
904	fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
905		if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
906			return Err(());
907		}
908
909		// Ensure that we never set the head to an un-ready queue.
910		if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
911			ServiceHead::<T>::put(queue);
912			Ok(true)
913		} else {
914			Ok(false)
915		}
916	}
917
918	/// The maximal weight that a single message ever can consume.
919	///
920	/// Any message using more than this will be marked as permanently overweight and not
921	/// automatically re-attempted. Returns `None` if the servicing of a message cannot begin.
922	/// `Some(0)` means that only messages with no weight may be served.
923	fn max_message_weight(limit: Weight) -> Option<Weight> {
924		let service_weight = T::ServiceWeight::get().unwrap_or_default();
925		let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
926
927		// Whatever weight is set, the one with the biggest one is used as the maximum weight. If a
928		// message is tried in one context and fails, it will be retried in the other context later.
929		let max_message_weight =
930			if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
931
932		if max_message_weight.is_zero() {
933			// If no service weight is set, we need to use the given limit as max message weight.
934			limit.checked_sub(&Self::single_msg_overhead())
935		} else {
936			max_message_weight.checked_sub(&Self::single_msg_overhead())
937		}
938	}
939
940	/// The overhead of servicing a single message.
941	fn single_msg_overhead() -> Weight {
942		T::WeightInfo::bump_service_head()
943			.saturating_add(T::WeightInfo::service_queue_base())
944			.saturating_add(
945				T::WeightInfo::service_page_base_completion()
946					.max(T::WeightInfo::service_page_base_no_completion()),
947			)
948			.saturating_add(T::WeightInfo::service_page_item())
949			.saturating_add(T::WeightInfo::ready_ring_unknit())
950	}
951
952	/// Checks invariants of the pallet config.
953	///
954	/// The results of this can only be relied upon if the config values are set to constants.
955	#[cfg(test)]
956	fn do_integrity_test() -> Result<(), String> {
957		ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
958
959		let max_block = T::BlockWeights::get().max_block;
960
961		if let Some(service) = T::ServiceWeight::get() {
962			if Self::max_message_weight(service).is_none() {
963				return Err(format!(
964					"ServiceWeight too low: {}. Must be at least {}",
965					service,
966					Self::single_msg_overhead(),
967				));
968			}
969
970			if service.any_gt(max_block) {
971				return Err(format!(
972					"ServiceWeight {service} is bigger than max block weight {max_block}"
973				));
974			}
975		}
976
977		if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
978			if on_idle.any_gt(max_block) {
979				return Err(format!(
980					"IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
981				));
982			}
983		}
984
985		if let (Some(service_weight), Some(on_idle)) =
986			(T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
987		{
988			if !(service_weight.all_gt(on_idle) ||
989				on_idle.all_gt(service_weight) ||
990				service_weight == on_idle)
991			{
992				return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into());
993			}
994		}
995
996		Ok(())
997	}
998
999	fn do_enqueue_messages<'a>(
1000		origin: &MessageOriginOf<T>,
1001		messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
1002	) {
1003		let mut book_state = BookStateFor::<T>::get(origin);
1004
1005		let mut maybe_page = None;
1006		// Check if we already have a page in progress.
1007		if book_state.end > book_state.begin {
1008			debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
1009			maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
1010				defensive!("Corruption: referenced page doesn't exist.");
1011				None
1012			});
1013		}
1014
1015		for message in messages {
1016			// Try to append the message to the current page if possible.
1017			if let Some(mut page) = maybe_page {
1018				maybe_page = match page.try_append_message::<T>(message) {
1019					Ok(_) => Some(page),
1020					Err(_) => {
1021						// Not enough space on the current page.
1022						// Let's save it, since we'll move to a new one.
1023						Pages::<T>::insert(origin, book_state.end - 1, page);
1024						None
1025					},
1026				}
1027			}
1028			// If not, add it to a new page.
1029			if maybe_page.is_none() {
1030				book_state.end.saturating_inc();
1031				book_state.count.saturating_inc();
1032				maybe_page = Some(Page::from_message::<T>(message));
1033			}
1034
1035			// Account for the message that we just added.
1036			book_state.message_count.saturating_inc();
1037			book_state
1038				.size
1039				// This should be payload size, but here the payload *is* the message.
1040				.saturating_accrue(message.len() as u64);
1041		}
1042
1043		// Save the last page that we created.
1044		if let Some(page) = maybe_page {
1045			Pages::<T>::insert(origin, book_state.end - 1, page);
1046		}
1047
1048		// Insert book state for current origin into the ready queue.
1049		if book_state.ready_neighbours.is_none() {
1050			match Self::ready_ring_knit(origin) {
1051				Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
1052				Err(()) => {
1053					defensive!("Ring state invalid when knitting");
1054				},
1055			}
1056		}
1057
1058		// NOTE: `T::QueueChangeHandler` is called by the caller.
1059		BookStateFor::<T>::insert(origin, book_state);
1060	}
1061
1062	/// Try to execute a single message that was marked as overweight.
1063	///
1064	/// The `weight_limit` is the weight that can be consumed to execute the message. The base
1065	/// weight of the function it self must be measured by the caller.
1066	pub fn do_execute_overweight(
1067		origin: MessageOriginOf<T>,
1068		page_index: PageIndex,
1069		index: T::Size,
1070		weight_limit: Weight,
1071	) -> Result<Weight, Error<T>> {
1072		match with_service_mutex(|| {
1073			Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1074		}) {
1075			Err(()) => Err(Error::<T>::RecursiveDisallowed),
1076			Ok(x) => x,
1077		}
1078	}
1079
1080	/// Same as `do_execute_overweight` but must be called while holding the `service_mutex`.
1081	fn do_execute_overweight_inner(
1082		origin: MessageOriginOf<T>,
1083		page_index: PageIndex,
1084		index: T::Size,
1085		weight_limit: Weight,
1086	) -> Result<Weight, Error<T>> {
1087		let mut book_state = BookStateFor::<T>::get(&origin);
1088		ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1089
1090		let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1091		let (pos, is_processed, payload) =
1092			page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1093		let payload_len = payload.len() as u64;
1094		ensure!(
1095			page_index < book_state.begin ||
1096				(page_index == book_state.begin && pos < page.first.into() as usize),
1097			Error::<T>::Queued
1098		);
1099		ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1100		use MessageExecutionStatus::*;
1101		let mut weight_counter = WeightMeter::with_limit(weight_limit);
1102		match Self::process_message_payload(
1103			origin.clone(),
1104			page_index,
1105			index,
1106			payload,
1107			&mut weight_counter,
1108			Weight::MAX,
1109			// ^^^ We never recognise it as permanently overweight, since that would result in an
1110			// additional overweight event being deposited.
1111		) {
1112			Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1113			StackLimitReached | Unprocessable { permanent: false } => {
1114				Err(Error::<T>::TemporarilyUnprocessable)
1115			},
1116			Unprocessable { permanent: true } | Processed => {
1117				page.note_processed_at_pos(pos);
1118				book_state.message_count.saturating_dec();
1119				book_state.size.saturating_reduce(payload_len);
1120				let page_weight = if page.remaining.is_zero() {
1121					debug_assert!(
1122						page.remaining_size.is_zero(),
1123						"no messages remaining; no space taken; qed"
1124					);
1125					Pages::<T>::remove(&origin, page_index);
1126					debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1127					book_state.count.saturating_dec();
1128					T::WeightInfo::execute_overweight_page_removed()
1129				// no need to consider .first or ready ring since processing an overweight page
1130				// would not alter that state.
1131				} else {
1132					Pages::<T>::insert(&origin, page_index, page);
1133					T::WeightInfo::execute_overweight_page_updated()
1134				};
1135				BookStateFor::<T>::insert(&origin, &book_state);
1136				T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1137				Ok(weight_counter.consumed().saturating_add(page_weight))
1138			},
1139		}
1140	}
1141
1142	/// Remove a stale page or one which has no more messages remaining to be processed.
1143	fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1144		match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1145			Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1146			Ok(x) => x,
1147		}
1148	}
1149
1150	/// Same as `do_reap_page` but must be called while holding the `service_mutex`.
1151	fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1152		let mut book_state = BookStateFor::<T>::get(origin);
1153		// definitely not reapable if the page's index is no less than the `begin`ning of ready
1154		// pages.
1155		ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1156
1157		let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1158
1159		// definitely reapable if the page has no messages in it.
1160		let reapable = page.remaining.is_zero();
1161
1162		// also reapable if the page index has dropped below our watermark.
1163		let cullable = || {
1164			let total_pages = book_state.count;
1165			let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1166
1167			// The number of stale pages - i.e. pages which contain unprocessed overweight messages.
1168			// We would prefer to keep these around but will restrict how far into history they can
1169			// extend if we notice that there's too many of them.
1170			//
1171			// We don't know *where* in history these pages are so we use a dynamic formula which
1172			// reduces the historical time horizon as the stale pages pile up and increases it as
1173			// they reduce.
1174			let stale_pages = total_pages - ready_pages;
1175
1176			// The maximum number of stale pages (i.e. of overweight messages) allowed before
1177			// culling can happen at all. Once there are more stale pages than this, then historical
1178			// pages may be dropped, even if they contain unprocessed overweight messages.
1179			let max_stale = T::MaxStale::get();
1180
1181			// The amount beyond the maximum which are being used. If it's not beyond the maximum
1182			// then we exit now since no culling is needed.
1183			let overflow = match stale_pages.checked_sub(max_stale + 1) {
1184				Some(x) => x + 1,
1185				None => return false,
1186			};
1187
1188			// The special formula which tells us how deep into index-history we will pages. As
1189			// the overflow is greater (and thus the need to drop items from storage is more urgent)
1190			// this is reduced, allowing a greater range of pages to be culled.
1191			// With a minimum `overflow` (`1`), this returns `max_stale ** 2`, indicating we only
1192			// cull beyond that number of indices deep into history.
1193			// At this overflow increases, our depth reduces down to a limit of `max_stale`. We
1194			// never want to reduce below this since this will certainly allow enough pages to be
1195			// culled in order to bring `overflow` back to zero.
1196			let backlog = (max_stale * max_stale / overflow).max(max_stale);
1197
1198			let watermark = book_state.begin.saturating_sub(backlog);
1199			page_index < watermark
1200		};
1201		ensure!(reapable || cullable(), Error::<T>::NotReapable);
1202
1203		Pages::<T>::remove(origin, page_index);
1204		debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1205		book_state.count.saturating_dec();
1206		book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1207		book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1208		BookStateFor::<T>::insert(origin, &book_state);
1209		T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1210		Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1211
1212		Ok(())
1213	}
1214
1215	/// Execute any messages remaining to be processed in the queue of `origin`, using up to
1216	/// `weight_limit` to do so. Any messages which would take more than `overweight_limit` to
1217	/// execute are deemed overweight and ignored.
1218	fn service_queue(
1219		origin: MessageOriginOf<T>,
1220		weight: &mut WeightMeter,
1221		overweight_limit: Weight,
1222	) -> (bool, Option<MessageOriginOf<T>>) {
1223		use PageExecutionStatus::*;
1224		if weight
1225			.try_consume(
1226				T::WeightInfo::service_queue_base()
1227					.saturating_add(T::WeightInfo::ready_ring_unknit()),
1228			)
1229			.is_err()
1230		{
1231			return (false, None);
1232		}
1233
1234		let mut book_state = BookStateFor::<T>::get(&origin);
1235		let mut total_processed = 0;
1236		if T::QueuePausedQuery::is_paused(&origin) {
1237			let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1238			return (false, next_ready);
1239		}
1240
1241		while book_state.end > book_state.begin {
1242			let (processed, status) =
1243				Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1244			total_processed.saturating_accrue(processed);
1245			match status {
1246				// Store the page progress and do not go to the next one.
1247				Bailed | NoProgress => break,
1248				// Go to the next page if this one is at the end.
1249				NoMore => (),
1250			};
1251			book_state.begin.saturating_inc();
1252		}
1253		let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1254		if book_state.begin >= book_state.end {
1255			// No longer ready - unknit.
1256			if let Some(neighbours) = book_state.ready_neighbours.take() {
1257				Self::ready_ring_unknit(&origin, neighbours);
1258			} else if total_processed > 0 {
1259				defensive!("Freshly processed queue must have been ready");
1260			}
1261		}
1262		BookStateFor::<T>::insert(&origin, &book_state);
1263		if total_processed > 0 {
1264			T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1265		}
1266		(total_processed > 0, next_ready)
1267	}
1268
1269	/// Service as many messages of a page as possible.
1270	///
1271	/// Returns how many messages were processed and the page's status.
1272	fn service_page(
1273		origin: &MessageOriginOf<T>,
1274		book_state: &mut BookStateOf<T>,
1275		weight: &mut WeightMeter,
1276		overweight_limit: Weight,
1277	) -> (u32, PageExecutionStatus) {
1278		use PageExecutionStatus::*;
1279		if weight
1280			.try_consume(
1281				T::WeightInfo::service_page_base_completion()
1282					.max(T::WeightInfo::service_page_base_no_completion()),
1283			)
1284			.is_err()
1285		{
1286			return (0, Bailed);
1287		}
1288
1289		let page_index = book_state.begin;
1290		let mut page = match Pages::<T>::get(origin, page_index) {
1291			Some(p) => p,
1292			None => {
1293				defensive!("message-queue: referenced page not found");
1294				return (0, NoMore);
1295			},
1296		};
1297
1298		let mut total_processed = 0;
1299
1300		// Execute as many messages as possible.
1301		let status = loop {
1302			use ItemExecutionStatus::*;
1303			match Self::service_page_item(
1304				origin,
1305				page_index,
1306				book_state,
1307				&mut page,
1308				weight,
1309				overweight_limit,
1310			) {
1311				Bailed => break PageExecutionStatus::Bailed,
1312				NoItem => break PageExecutionStatus::NoMore,
1313				NoProgress => break PageExecutionStatus::NoProgress,
1314				// Keep going as long as we make progress...
1315				Executed(true) => total_processed.saturating_inc(),
1316				Executed(false) => (),
1317			}
1318		};
1319
1320		if page.is_complete() {
1321			debug_assert!(status != Bailed, "we never bail if a page became complete");
1322			Pages::<T>::remove(origin, page_index);
1323			debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1324			book_state.count.saturating_dec();
1325		} else {
1326			Pages::<T>::insert(origin, page_index, page);
1327		}
1328		(total_processed, status)
1329	}
1330
1331	/// Execute the next message of a page.
1332	pub(crate) fn service_page_item(
1333		origin: &MessageOriginOf<T>,
1334		page_index: PageIndex,
1335		book_state: &mut BookStateOf<T>,
1336		page: &mut PageOf<T>,
1337		weight: &mut WeightMeter,
1338		overweight_limit: Weight,
1339	) -> ItemExecutionStatus {
1340		use MessageExecutionStatus::*;
1341		// This ugly pre-checking is needed for the invariant
1342		// "we never bail if a page became complete".
1343		if page.is_complete() {
1344			return ItemExecutionStatus::NoItem;
1345		}
1346		if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1347			return ItemExecutionStatus::Bailed;
1348		}
1349
1350		let payload = &match page.peek_first() {
1351			Some(m) => m,
1352			None => return ItemExecutionStatus::NoItem,
1353		}[..];
1354		let payload_len = payload.len() as u64;
1355
1356		// Store these for the case that `process_message_payload` is recursive.
1357		Pages::<T>::insert(origin, page_index, &*page);
1358		BookStateFor::<T>::insert(origin, &*book_state);
1359
1360		let res = Self::process_message_payload(
1361			origin.clone(),
1362			page_index,
1363			page.first_index,
1364			payload,
1365			weight,
1366			overweight_limit,
1367		);
1368
1369		// And restore them afterwards to see the changes of a recursive call.
1370		*book_state = BookStateFor::<T>::get(origin);
1371		if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1372			*page = new_page;
1373		} else {
1374			defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1375			return ItemExecutionStatus::NoItem;
1376		};
1377
1378		let is_processed = match res {
1379			InsufficientWeight => return ItemExecutionStatus::Bailed,
1380			Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1381			Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1382			Overweight => false,
1383		};
1384
1385		if is_processed {
1386			book_state.message_count.saturating_dec();
1387			book_state.size.saturating_reduce(payload_len as u64);
1388		}
1389		page.skip_first(is_processed);
1390		ItemExecutionStatus::Executed(is_processed)
1391	}
1392
1393	/// Ensure the correctness of state of this pallet.
1394	///
1395	/// # Assumptions-
1396	///
1397	/// If `serviceHead` points to a ready Queue, then BookState of that Queue has:
1398	///
1399	/// * `message_count` > 0
1400	/// * `size` > 0
1401	/// * `end` > `begin`
1402	/// * Some(ready_neighbours)
1403	/// * If `ready_neighbours.next` == self.origin, then `ready_neighbours.prev` == self.origin
1404	///   (only queue in ring)
1405	///
1406	/// For Pages(begin to end-1) in BookState:
1407	///
1408	/// * `remaining` > 0
1409	/// * `remaining_size` > 0
1410	/// * `first` <= `last`
1411	/// * Every page can be decoded into peek_* functions
1412	#[cfg(any(test, feature = "try-runtime", feature = "std"))]
1413	pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
1414		// Checking memory corruption for BookStateFor
1415		ensure!(
1416			BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1417			"Memory Corruption in BookStateFor"
1418		);
1419		// Checking memory corruption for Pages
1420		ensure!(
1421			Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1422			"Memory Corruption in Pages"
1423		);
1424
1425		// Basic checks for each book
1426		for book in BookStateFor::<T>::iter_values() {
1427			ensure!(book.end >= book.begin, "Invariant");
1428			ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1429			ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1430			ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1431			ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1432
1433			let fp: QueueFootprint = book.into();
1434			ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1435		}
1436
1437		// loop around this origin
1438		let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1439
1440		while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1441			ensure!(
1442				BookStateFor::<T>::contains_key(&head),
1443				"Service head must point to an existing book"
1444			);
1445
1446			let head_book_state = BookStateFor::<T>::get(&head);
1447			ensure!(
1448				head_book_state.message_count > 0,
1449				"There must be some messages if in ReadyRing"
1450			);
1451			ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1452			ensure!(
1453				head_book_state.end > head_book_state.begin,
1454				"End > Begin if unprocessed messages exists"
1455			);
1456			ensure!(
1457				head_book_state.ready_neighbours.is_some(),
1458				"There must be neighbours if in ReadyRing"
1459			);
1460
1461			if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1462				ensure!(
1463					head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1464					"Can only happen if only queue in ReadyRing"
1465				);
1466			}
1467
1468			for page_index in head_book_state.begin..head_book_state.end {
1469				let page = Pages::<T>::get(&head, page_index).unwrap();
1470				let remaining_messages = page.remaining;
1471				let mut counted_remaining_messages: u32 = 0;
1472				ensure!(
1473					remaining_messages > 0.into(),
1474					"These must be some messages that have not been processed yet!"
1475				);
1476
1477				for i in 0..u32::MAX {
1478					if let Some((_, processed, _)) = page.peek_index(i as usize) {
1479						if !processed {
1480							counted_remaining_messages += 1;
1481						}
1482					} else {
1483						break;
1484					}
1485				}
1486
1487				ensure!(
1488					remaining_messages.into() == counted_remaining_messages,
1489					"Memory Corruption"
1490				);
1491			}
1492
1493			if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1494				break;
1495			}
1496		}
1497		Ok(())
1498	}
1499
1500	/// Print the pages in each queue and the messages in each page.
1501	///
1502	/// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`.
1503	///
1504	/// # Example output
1505	///
1506	/// ```text
1507	/// queue Here:
1508	///   page 0: []
1509	/// > page 1: []
1510	///   page 2: ["\0weight=4", "\0c", ]
1511	///   page 3: ["\0bigbig 1", ]
1512	///   page 4: ["\0bigbig 2", ]
1513	///   page 5: ["\0bigbig 3", ]
1514	/// ```
1515	#[cfg(feature = "std")]
1516	pub fn debug_info() -> String {
1517		let mut info = String::new();
1518		for (origin, book_state) in BookStateFor::<T>::iter() {
1519			let mut queue = format!("queue {:?}:\n", &origin);
1520			let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1521			pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1522			for (page_index, mut page) in pages.into_iter() {
1523				let page_info = if book_state.begin == page_index { ">" } else { " " };
1524				let mut page_info = format!(
1525					"{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1526					page_info, page_index, page.first, page.last, page.remaining
1527				);
1528				for i in 0..u32::MAX {
1529					if let Some((_, processed, message)) =
1530						page.peek_index(i.try_into().expect("std-only code"))
1531					{
1532						let msg = String::from_utf8_lossy(message);
1533						if processed {
1534							page_info.push('*');
1535						}
1536						page_info.push_str(&format!("{:?}, ", msg));
1537						page.skip_first(true);
1538					} else {
1539						break;
1540					}
1541				}
1542				page_info.push_str("]\n");
1543				queue.push_str(&page_info);
1544			}
1545			info.push_str(&queue);
1546		}
1547		info
1548	}
1549
1550	/// Process a single message.
1551	///
1552	/// The base weight of this function needs to be accounted for by the caller. `weight` is the
1553	/// remaining weight to process the message. `overweight_limit` is the maximum weight that a
1554	/// message can ever consume. Messages above this limit are marked as permanently overweight.
1555	/// This process is also transactional, any form of error that occurs in processing a message
1556	/// causes storage changes to be rolled back.
1557	fn process_message_payload(
1558		origin: MessageOriginOf<T>,
1559		page_index: PageIndex,
1560		message_index: T::Size,
1561		message: &[u8],
1562		meter: &mut WeightMeter,
1563		overweight_limit: Weight,
1564	) -> MessageExecutionStatus {
1565		let mut id = sp_io::hashing::blake2_256(message);
1566		use ProcessMessageError::*;
1567		let prev_consumed = meter.consumed();
1568
1569		let transaction =
1570			storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1571				let res =
1572					T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1573				match &res {
1574					Ok(_) => TransactionOutcome::Commit(Ok(res)),
1575					Err(_) => TransactionOutcome::Rollback(Ok(res)),
1576				}
1577			});
1578
1579		let transaction = match transaction {
1580			Ok(result) => result,
1581			_ => {
1582				defensive!(
1583					"Error occurred processing message, storage changes will be rolled back"
1584				);
1585				return MessageExecutionStatus::Unprocessable { permanent: true };
1586			},
1587		};
1588
1589		match transaction {
1590			Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1591				// Permanently overweight.
1592				Self::deposit_event(Event::<T>::OverweightEnqueued {
1593					id,
1594					origin,
1595					page_index,
1596					message_index,
1597				});
1598				MessageExecutionStatus::Overweight
1599			},
1600			Err(Overweight(_)) => {
1601				// Temporarily overweight - save progress and stop processing this
1602				// queue.
1603				MessageExecutionStatus::InsufficientWeight
1604			},
1605			Err(Yield) => {
1606				// Processing should be reattempted later.
1607				MessageExecutionStatus::Unprocessable { permanent: false }
1608			},
1609			Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1610				// Permanent error - drop
1611				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1612				MessageExecutionStatus::Unprocessable { permanent: true }
1613			},
1614			Err(error @ StackLimitReached) => {
1615				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1616				MessageExecutionStatus::StackLimitReached
1617			},
1618			Ok(success) => {
1619				// Success
1620				let weight_used = meter.consumed().saturating_sub(prev_consumed);
1621				Self::deposit_event(Event::<T>::Processed {
1622					id: id.into(),
1623					origin,
1624					weight_used,
1625					success,
1626				});
1627				MessageExecutionStatus::Processed
1628			},
1629		}
1630	}
1631
1632	fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1633		let mut weight = WeightMeter::with_limit(weight_limit);
1634
1635		// Get the maximum weight that processing a single message may take:
1636		let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1637			if matches!(context, ServiceQueuesContext::OnInitialize) {
1638				defensive!("Not enough weight to service a single message.");
1639			}
1640			Weight::zero()
1641		});
1642
1643		match with_service_mutex(|| {
1644			let mut next = match Self::bump_service_head(&mut weight) {
1645				Some(h) => h,
1646				None => return weight.consumed(),
1647			};
1648			// The last queue that did not make any progress.
1649			// The loop aborts as soon as it arrives at this queue again without making any progress
1650			// on other queues in between.
1651			let mut last_no_progress = None;
1652
1653			loop {
1654				let (progressed, n) =
1655					Self::service_queue(next.clone(), &mut weight, overweight_limit);
1656				next = match n {
1657					Some(n) => {
1658						if !progressed {
1659							if last_no_progress == Some(n.clone()) {
1660								break;
1661							}
1662							if last_no_progress.is_none() {
1663								last_no_progress = Some(next.clone())
1664							}
1665							n
1666						} else {
1667							last_no_progress = None;
1668							n
1669						}
1670					},
1671					None => break,
1672				}
1673			}
1674			weight.consumed()
1675		}) {
1676			Err(()) => weight.consumed(),
1677			Ok(w) => w,
1678		}
1679	}
1680}
1681
1682impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pallet<T> {
1683	fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
1684		Pallet::<T>::set_service_head(weight, origin)
1685	}
1686}
1687
1688/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues.
1689pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1690	// Holds the singleton token instance.
1691	environmental::environmental!(token: Option<()>);
1692
1693	token::using_once(&mut Some(()), || {
1694		// The first `ok_or` should always be `Ok` since we are inside a `using_once`.
1695		let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1696
1697		// Put the token back when we're done.
1698		defer! {
1699			token::with(|t| {
1700				*t = Some(hold);
1701			});
1702		}
1703
1704		Ok(f())
1705	})
1706}
1707
1708/// Provides a [`sp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type.
1709pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1710impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1711	fn get() -> u32 {
1712		T::max_encoded_len() as u32
1713	}
1714}
1715
1716/// Calculates the maximum message length and exposed it through the [`codec::MaxEncodedLen`] trait.
1717pub struct MaxMessageLen<Origin, Size, HeapSize>(
1718	core::marker::PhantomData<(Origin, Size, HeapSize)>,
1719);
1720impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1721	for MaxMessageLen<Origin, Size, HeapSize>
1722{
1723	fn get() -> u32 {
1724		(HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1725	}
1726}
1727
1728/// The maximal message length.
1729pub type MaxMessageLenOf<T> =
1730	MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1731/// The maximal encoded origin length.
1732pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1733/// The `MessageOrigin` of this pallet.
1734pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1735/// The maximal heap size of a page.
1736pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1737/// The [`Page`] of this pallet.
1738pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1739/// The [`BookState`] of this pallet.
1740pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1741
1742/// Converts a [`sp_core::Get`] with returns a type that can be cast into an `u32` into a `Get`
1743/// which returns an `u32`.
1744pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1745impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1746	fn get() -> u32 {
1747		T::get().into()
1748	}
1749}
1750
1751impl<T: Config> ServiceQueues for Pallet<T> {
1752	type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1753
1754	fn service_queues(weight_limit: Weight) -> Weight {
1755		Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1756	}
1757
1758	/// Execute a single overweight message.
1759	///
1760	/// The weight limit must be enough for `execute_overweight` and the message execution itself.
1761	fn execute_overweight(
1762		weight_limit: Weight,
1763		(message_origin, page, index): Self::OverweightMessageAddress,
1764	) -> Result<Weight, ExecuteOverweightError> {
1765		let mut weight = WeightMeter::with_limit(weight_limit);
1766		if weight
1767			.try_consume(
1768				T::WeightInfo::execute_overweight_page_removed()
1769					.max(T::WeightInfo::execute_overweight_page_updated()),
1770			)
1771			.is_err()
1772		{
1773			return Err(ExecuteOverweightError::InsufficientWeight);
1774		}
1775
1776		Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
1777			|e| match e {
1778				Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1779				Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1780				Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1781				Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued => {
1782					ExecuteOverweightError::NotFound
1783				},
1784				Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1785				_ => ExecuteOverweightError::Other,
1786			},
1787		)
1788	}
1789}
1790
1791impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
1792	type MaxMessageLen =
1793		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1794
1795	fn enqueue_message(
1796		message: BoundedSlice<u8, Self::MaxMessageLen>,
1797		origin: <T::MessageProcessor as ProcessMessage>::Origin,
1798	) {
1799		Self::do_enqueue_messages(&origin, [message].into_iter());
1800		let book_state = BookStateFor::<T>::get(&origin);
1801		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1802	}
1803
1804	fn enqueue_messages<'a>(
1805		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1806		origin: <T::MessageProcessor as ProcessMessage>::Origin,
1807	) {
1808		Self::do_enqueue_messages(&origin, messages);
1809		let book_state = BookStateFor::<T>::get(&origin);
1810		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1811	}
1812
1813	fn sweep_queue(origin: MessageOriginOf<T>) {
1814		if !BookStateFor::<T>::contains_key(&origin) {
1815			return;
1816		}
1817		let mut book_state = BookStateFor::<T>::get(&origin);
1818		book_state.begin = book_state.end;
1819		if let Some(neighbours) = book_state.ready_neighbours.take() {
1820			Self::ready_ring_unknit(&origin, neighbours);
1821		}
1822		BookStateFor::<T>::insert(&origin, &book_state);
1823	}
1824}
1825
1826impl<T: Config> QueueFootprintQuery<MessageOriginOf<T>> for Pallet<T> {
1827	type MaxMessageLen =
1828		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1829
1830	fn get_batches_footprints<'a>(
1831		origin: MessageOriginOf<T>,
1832		msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1833		total_pages_limit: u32,
1834	) -> BatchesFootprints {
1835		let mut batches_footprints = BatchesFootprints::default();
1836
1837		let mut new_page = false;
1838		let mut total_pages_count = 0;
1839		let mut current_page_pos: usize = T::HeapSize::get().into() as usize;
1840
1841		let book = BookStateFor::<T>::get(&origin);
1842		if book.end > book.begin {
1843			total_pages_count = book.end - book.begin;
1844			if let Some(page) = Pages::<T>::get(origin, book.end - 1) {
1845				current_page_pos = page.heap_pos();
1846				batches_footprints.first_page_pos = current_page_pos;
1847			}
1848		}
1849
1850		let mut msgs = msgs.peekable();
1851		while let Some(msg) = msgs.peek() {
1852			if total_pages_count > total_pages_limit {
1853				return batches_footprints;
1854			}
1855
1856			match Page::<T::Size, T::HeapSize>::can_append_message_at(current_page_pos, msg.len()) {
1857				Ok(new_pos) => {
1858					current_page_pos = new_pos;
1859					batches_footprints.push(msg, new_page);
1860					new_page = false;
1861					msgs.next();
1862				},
1863				Err(_) => {
1864					// Would not fit into the current page.
1865					// We start a new one and try again in the next iteration.
1866					new_page = true;
1867					total_pages_count += 1;
1868					current_page_pos = 0;
1869				},
1870			}
1871		}
1872
1873		batches_footprints
1874	}
1875
1876	fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1877		BookStateFor::<T>::get(&origin).into()
1878	}
1879}