pezpallet_message_queue/
lib.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: Apache-2.0
5
6// Licensed under the Apache License, Version 2.0 (the "License");
7// you may not use this file except in compliance with the License.
8// You may obtain a copy of the License at
9//
10// 	http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! # Generalized Message Queue Pezpallet
19//!
20//! Provides generalized message queuing and processing capabilities on a per-queue basis for
21//! arbitrary use-cases.
22//!
23//! # Design Goals
24//!
25//! 1. Minimal assumptions about `Message`s and `MessageOrigin`s. Both should be MEL bounded blobs.
26//!  This ensures the generality and reusability of the pezpallet.
27//! 2. Well known and tightly limited pre-dispatch PoV weights, especially for message execution.
28//!  This is paramount for the success of the pezpallet since message execution is done in
29//!  `on_initialize` which must _never_ under-estimate its PoV weight. It also needs a frugal PoV
30//!  footprint since PoV is scarce and this is (possibly) done in every block. This must also hold
31//! in  the presence of unpredictable message size distributions.
32//! 3. Usable as XCMP, DMP and UMP message/dispatch queue - possibly through adapter types.
33//!
34//! # Design
35//!
36//! The pezpallet has means to enqueue, store and process messages. This is implemented by having
37//! *queues* which store enqueued messages and can be *served* to process said messages. A queue is
38//! identified by its origin in the `BookStateFor`. Each message has an origin which defines into
39//! which queue it will be stored. Messages are stored by being appended to the last [`Page`] of a
40//! book. Each book keeps track of its pages by indexing `Pages`. The `ReadyRing` contains all
41//! queues which hold at least one unprocessed message and are thereby *ready* to be serviced. The
42//! `ServiceHead` indicates which *ready* queue is the next to be serviced.
43//! The pezpallet implements [`pezframe_support::traits::EnqueueMessage`],
44//! [`pezframe_support::traits::ServiceQueues`] and has [`pezframe_support::traits::ProcessMessage`]
45//! and [`OnQueueChanged`] hooks to communicate with the outside world.
46//!
47//! NOTE: The storage items are not linked since they are not public.
48//!
49//! **Message Execution**
50//!
51//! Executing a message is offloaded to the [`Config::MessageProcessor`] which contains the actual
52//! logic of how to handle the message since they are blobs. Storage changes are not rolled back on
53//! error.
54//!
55//! A failed message can be temporarily or permanently overweight. The pezpallet will perpetually
56//! try to execute a temporarily overweight message. A permanently overweight message is skipped and
57//! must be executed manually.
58//!
59//! **Reentrancy**
60//!
61//! This pezpallet has two entry points for executing (possibly recursive) logic;
62//! [`Pezpallet::service_queues`] and [`Pezpallet::execute_overweight`]. Both entry points are
63//! guarded by the same mutex to error on reentrancy. The only functions that are explicitly
64//! **allowed** to be called by a message processor are: [`Pezpallet::enqueue_message`] and
65//! [`Pezpallet::enqueue_messages`]. All other functions are forbidden and error with
66//! [`Error::RecursiveDisallowed`].
67//!
68//! **Pagination**
69//!
70//! Queues are stored in a *paged* manner by splitting their messages into [`Page`]s. This results
71//! in a lot of complexity when implementing the pezpallet but is completely necessary to achieve
72//! the second #[Design Goal](design-goals). The problem comes from the fact a message can
73//! *possibly* be quite large, lets say 64KiB. This then results in a *MEL* of at least 64KiB which
74//! results in a PoV of at least 64KiB. Now we have the assumption that most messages are much
75//! shorter than their maximum allowed length. This would result in most messages having a
76//! pre-dispatch PoV size which is much larger than their post-dispatch PoV size, possibly by a
77//! factor of thousand. Disregarding this observation would cripple the processing power of the
78//! pezpallet since it cannot straighten out this discrepancy at runtime. Conceptually, the
79//! implementation is packing as many messages into a single bounded vec, as actually fit into the
80//! bounds. This reduces the wasted PoV.
81//!
82//! **Page Data Layout**
83//!
84//! A Page contains a heap which holds all its messages. The heap is built by concatenating
85//! `(ItemHeader, Message)` pairs. The [`ItemHeader`] contains the length of the message which is
86//! needed for retrieving it. This layout allows for constant access time of the next message and
87//! linear access time for any message in the page. The header must remain minimal to reduce its PoV
88//! impact.
89//!
90//! **Weight Metering**
91//!
92//! The pezpallet utilizes the [`pezsp_weights::WeightMeter`] to manually track its consumption to
93//! always stay within the required limit. This implies that the message processor hook can
94//! calculate the weight of a message without executing it. This restricts the possible use-cases
95//! but is necessary since the pezpallet runs in `on_initialize` which has a hard weight limit. The
96//! weight meter is used in a way that `can_accrue` and `check_accrue` are always used to check the
97//! remaining weight of an operation before committing to it. The process of exiting due to
98//! insufficient weight is termed "bailing".
99//!
100//! # Scenario: Message enqueuing
101//!
102//! A message `m` is enqueued for origin `o` into queue `Q[o]` through
103//! [`pezframe_support::traits::EnqueueMessage::enqueue_message`]`(m, o)`.
104//!
105//! First the queue is either loaded if it exists or otherwise created with empty default values.
106//! The message is then inserted to the queue by appended it into its last `Page` or by creating a
107//! new `Page` just for `m` if it does not fit in there. The number of messages in the `Book` is
108//! incremented.
109//!
110//! `Q[o]` is now *ready* which will eventually result in `m` being processed.
111//!
112//! # Scenario: Message processing
113//!
114//! The pezpallet runs each block in `on_initialize` or when being manually called through
115//! [`pezframe_support::traits::ServiceQueues::service_queues`].
116//!
117//! First it tries to "rotate" the `ReadyRing` by one through advancing the `ServiceHead` to the
118//! next *ready* queue. It then starts to service this queue by servicing as many pages of it as
119//! possible. Servicing a page means to execute as many message of it as possible. Each executed
120//! message is marked as *processed* if the [`Config::MessageProcessor`] return Ok. An event
121//! [`Event::Processed`] is emitted afterwards. It is possible that the weight limit of the
122//! pezpallet will never allow a specific message to be executed. In this case it remains as
123//! unprocessed and is skipped. This process stops if either there are no more messages in the queue
124//! or the remaining weight became insufficient to service this queue. If there is enough weight it
125//! tries to advance to the next *ready* queue and service it. This continues until there are no
126//! more queues on which it can make progress or not enough weight to check that.
127//!
128//! # Scenario: Overweight execution
129//!
130//! A permanently over-weight message which was skipped by the message processing will never be
131//! executed automatically through `on_initialize` nor by calling
132//! [`pezframe_support::traits::ServiceQueues::service_queues`].
133//!
134//! Manual intervention in the form of
135//! [`pezframe_support::traits::ServiceQueues::execute_overweight`] is necessary. Overweight
136//! messages emit an [`Event::OverweightEnqueued`] event which can be used to extract the arguments
137//! for manual execution. This only works on permanently overweight messages. There is no guarantee
138//! that this will work since the message could be part of a stale page and be reaped before
139//! execution commences.
140//!
141//! # Terminology
142//!
143//! - `Message`: A blob of data into which the pezpallet has no introspection, defined as
144//! [`BoundedSlice<u8, MaxMessageLenOf<T>>`]. The message length is limited by [`MaxMessageLenOf`]
145//! which is calculated from [`Config::HeapSize`] and [`ItemHeader::max_encoded_len()`].
146//! - `MessageOrigin`: A generic *origin* of a message, defined as [`MessageOriginOf`]. The
147//! requirements for it are kept minimal to remain as generic as possible. The type is defined in
148//! [`pezframe_support::traits::ProcessMessage::Origin`].
149//! - `Page`: An array of `Message`s, see [`Page`]. Can never be empty.
150//! - `Book`: A list of `Page`s, see [`BookState`]. Can be empty.
151//! - `Queue`: A `Book` together with an `MessageOrigin` which can be part of the `ReadyRing`. Can
152//!   be empty.
153//! - `ReadyRing`: A double-linked list which contains all *ready* `Queue`s. It chains together the
154//!   queues via their `ready_neighbours` fields. A `Queue` is *ready* if it contains at least one
155//!   `Message` which can be processed. Can be empty.
156//! - `ServiceHead`: A pointer into the `ReadyRing` to the next `Queue` to be serviced.
157//! - (`un`)`processed`: A message is marked as *processed* after it was executed by the pezpallet.
158//!   A message which was either: not yet executed or could not be executed remains as `unprocessed`
159//!   which is the default state for a message after being enqueued.
160//! - `knitting`/`unknitting`: The means of adding or removing a `Queue` from the `ReadyRing`.
161//! - `MEL`: The Max Encoded Length of a type, see [`codec::MaxEncodedLen`].
162//! - `Reentrance`: To enter an execution context again before it has completed.
163//!
164//! # Properties
165//!
166//! **Liveness - Enqueueing**
167//!
168//! It is always possible to enqueue any message for any `MessageOrigin`.
169//!
170//! **Liveness - Processing**
171//!
172//! `on_initialize` always respects its finite weight-limit.
173//!
174//! **Progress - Enqueueing**
175//!
176//! An enqueued message immediately becomes *unprocessed* and thereby eligible for execution.
177//!
178//! **Progress - Processing**
179//!
180//! The pezpallet will execute at least one unprocessed message per block, if there is any. Ensuring
181//! this property needs careful consideration of the concrete weights, since it is possible that the
182//! weight limit of `on_initialize` never allows for the execution of even one message; trivially if
183//! the limit is set to zero. `integrity_test` can be used to ensure that this property holds.
184//!
185//! **Fairness - Enqueuing**
186//!
187//! Enqueueing a message for a specific `MessageOrigin` does not influence the ability to enqueue a
188//! message for the same of any other `MessageOrigin`; guaranteed by **Liveness - Enqueueing**.
189//!
190//! **Fairness - Processing**
191//!
192//! The average amount of weight available for message processing is the same for each queue if the
193//! number of queues is constant. Creating a new queue must therefore be, possibly economically,
194//! expensive. Currently this is archived by having one queue per para-chain/thread, which keeps the
195//! number of queues within `O(n)` and should be "good enough".
196
197#![deny(missing_docs)]
198#![cfg_attr(not(feature = "std"), no_std)]
199
200mod benchmarking;
201mod integration_test;
202mod mock;
203pub mod mock_helpers;
204mod tests;
205pub mod weights;
206
207extern crate alloc;
208
209use alloc::vec::Vec;
210use codec::{Codec, ConstEncodedLen, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
211use core::{fmt::Debug, ops::Deref};
212use pezframe_support::{
213	defensive,
214	pezpallet_prelude::*,
215	traits::{
216		BatchesFootprints, Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
217		ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
218		QueueFootprintQuery, QueuePausedQuery, ServiceQueues,
219	},
220	BoundedSlice, CloneNoBound, DefaultNoBound,
221};
222use pezframe_system::pezpallet_prelude::*;
223pub use pezpallet::*;
224use pezsp_arithmetic::traits::{BaseArithmetic, Unsigned};
225use pezsp_core::{defer, H256};
226use pezsp_runtime::{
227	traits::{One, Zero},
228	SaturatedConversion, Saturating, TransactionOutcome,
229};
230use pezsp_weights::WeightMeter;
231use scale_info::TypeInfo;
232pub use weights::WeightInfo;
233
234/// Type for identifying a page.
235type PageIndex = u32;
236
237/// Data encoded and prefixed to the encoded `MessageItem`.
238#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
239pub struct ItemHeader<Size> {
240	/// The length of this item, not including the size of this header. The next item of the page
241	/// follows immediately after the payload of this item.
242	payload_len: Size,
243	/// Whether this item has been processed.
244	is_processed: bool,
245}
246
247impl<Size: ConstEncodedLen> ConstEncodedLen for ItemHeader<Size> {} // marker
248
249/// A page of messages. Pages always contain at least one item.
250#[derive(
251	CloneNoBound, Encode, Decode, RuntimeDebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen,
252)]
253#[scale_info(skip_type_params(HeapSize))]
254#[codec(mel_bound(Size: MaxEncodedLen))]
255pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
256	/// Messages remaining to be processed; this includes overweight messages which have been
257	/// skipped.
258	remaining: Size,
259	/// The size of all remaining messages to be processed.
260	///
261	/// Includes overweight messages outside of the `first` to `last` window.
262	remaining_size: Size,
263	/// The number of items before the `first` item in this page.
264	first_index: Size,
265	/// The heap-offset of the header of the first message item in this page which is ready for
266	/// processing.
267	first: Size,
268	/// The heap-offset of the header of the last message item in this page.
269	last: Size,
270	/// The heap.
271	heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
272}
273
274impl<
275		Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
276		HeapSize: Get<Size>,
277	> Page<Size, HeapSize>
278where
279	ItemHeader<Size>: ConstEncodedLen,
280{
281	/// Create a [`Page`] from one unprocessed message.
282	fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
283		let payload_len = message.len();
284		let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
285		let payload_len = payload_len.saturated_into();
286		let header = ItemHeader::<Size> { payload_len, is_processed: false };
287
288		let mut heap = Vec::with_capacity(data_len);
289		header.using_encoded(|h| heap.extend_from_slice(h));
290		heap.extend_from_slice(message.deref());
291
292		Page {
293			remaining: One::one(),
294			remaining_size: payload_len,
295			first_index: Zero::zero(),
296			first: Zero::zero(),
297			last: Zero::zero(),
298			heap: BoundedVec::defensive_truncate_from(heap),
299		}
300	}
301
302	/// The heap position where a new message can be appended to the current page.
303	fn heap_pos(&self) -> usize {
304		// The heap is actually a `Vec`, so the place where we can append data
305		// is the end of the `Vec`.
306		self.heap.len()
307	}
308
309	/// Check if a message can be appended to the current page at the provided heap position.
310	///
311	/// On success, returns the resulting position in the page where a new payload can be
312	/// appended.
313	fn can_append_message_at(pos: usize, message_len: usize) -> Result<usize, ()> {
314		let header_size = ItemHeader::<Size>::max_encoded_len();
315		let data_len = header_size.saturating_add(message_len);
316		let heap_size = HeapSize::get().into() as usize;
317		let new_pos = pos.saturating_add(data_len);
318		if new_pos <= heap_size {
319			Ok(new_pos)
320		} else {
321			Err(())
322		}
323	}
324
325	/// Try to append one message to a page.
326	fn try_append_message<T: Config>(
327		&mut self,
328		message: BoundedSlice<u8, MaxMessageLenOf<T>>,
329	) -> Result<(), ()> {
330		let pos = self.heap_pos();
331		Self::can_append_message_at(pos, message.len())?;
332		let payload_len = message.len().saturated_into();
333		let header = ItemHeader::<Size> { payload_len, is_processed: false };
334
335		let mut heap = core::mem::take(&mut self.heap).into_inner();
336		header.using_encoded(|h| heap.extend_from_slice(h));
337		heap.extend_from_slice(message.deref());
338		self.heap = BoundedVec::defensive_truncate_from(heap);
339		self.last = pos.saturated_into();
340		self.remaining.saturating_inc();
341		self.remaining_size.saturating_accrue(payload_len);
342		Ok(())
343	}
344
345	/// Returns the first message in the page without removing it.
346	///
347	/// SAFETY: Does not panic even on corrupted storage.
348	fn peek_first(&self) -> Option<BoundedSlice<'_, u8, IntoU32<HeapSize, Size>>> {
349		if self.first > self.last {
350			return None;
351		}
352		let f = (self.first.into() as usize).min(self.heap.len());
353		let mut item_slice = &self.heap[f..];
354		if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
355			let payload_len = h.payload_len.into() as usize;
356			if payload_len <= item_slice.len() {
357				// impossible to truncate since is sliced up from `self.heap: BoundedVec<u8,
358				// HeapSize>`
359				return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]));
360			}
361		}
362		defensive!("message-queue: heap corruption");
363		None
364	}
365
366	/// Point `first` at the next message, marking the first as processed if `is_processed` is true.
367	fn skip_first(&mut self, is_processed: bool) {
368		let f = (self.first.into() as usize).min(self.heap.len());
369		if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
370			if is_processed && !h.is_processed {
371				h.is_processed = true;
372				h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
373				self.remaining.saturating_dec();
374				self.remaining_size.saturating_reduce(h.payload_len);
375			}
376			self.first
377				.saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
378			self.first.saturating_accrue(h.payload_len);
379			self.first_index.saturating_inc();
380		}
381	}
382
383	/// Return the message with index `index` in the form of `(position, processed, message)`.
384	fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
385		let mut pos = 0;
386		let mut item_slice = &self.heap[..];
387		let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
388		for _ in 0..index {
389			let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
390			let item_len = h.payload_len.into() as usize;
391			if item_slice.len() < item_len {
392				return None;
393			}
394			item_slice = &item_slice[item_len..];
395			pos.saturating_accrue(header_len.saturating_add(item_len));
396		}
397		let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
398		if item_slice.len() < h.payload_len.into() as usize {
399			return None;
400		}
401		item_slice = &item_slice[..h.payload_len.into() as usize];
402		Some((pos, h.is_processed, item_slice))
403	}
404
405	/// Set the `is_processed` flag for the item at `pos` to be `true` if not already and decrement
406	/// the `remaining` counter of the page.
407	///
408	/// Does nothing if no [`ItemHeader`] could be decoded at the given position.
409	fn note_processed_at_pos(&mut self, pos: usize) {
410		if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
411			if !h.is_processed {
412				h.is_processed = true;
413				h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
414				self.remaining.saturating_dec();
415				self.remaining_size.saturating_reduce(h.payload_len);
416			}
417		}
418	}
419
420	/// Returns whether the page is *complete* which means that no messages remain.
421	fn is_complete(&self) -> bool {
422		self.remaining.is_zero()
423	}
424}
425
426/// A single link in the double-linked Ready Ring list.
427#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug, PartialEq)]
428pub struct Neighbours<MessageOrigin> {
429	/// The previous queue.
430	prev: MessageOrigin,
431	/// The next queue.
432	next: MessageOrigin,
433}
434
435/// The state of a queue as represented by a book of its pages.
436///
437/// Each queue has exactly one book which holds all of its pages. All pages of a book combined
438/// contain all of the messages of its queue; hence the name *Book*.
439/// Books can be chained together in a double-linked fashion through their `ready_neighbours` field.
440#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, RuntimeDebug)]
441pub struct BookState<MessageOrigin> {
442	/// The first page with some items to be processed in it. If this is `>= end`, then there are
443	/// no pages with items to be processing in them.
444	begin: PageIndex,
445	/// One more than the last page with some items to be processed in it.
446	end: PageIndex,
447	/// The number of pages stored at present.
448	///
449	/// This might be larger than `end-begin`, because we keep pages with unprocessed overweight
450	/// messages outside of the end/begin window.
451	count: PageIndex,
452	/// If this book has any ready pages, then this will be `Some` with the previous and next
453	/// neighbours. This wraps around.
454	ready_neighbours: Option<Neighbours<MessageOrigin>>,
455	/// The number of unprocessed messages stored at present.
456	message_count: u64,
457	/// The total size of all unprocessed messages stored at present.
458	size: u64,
459}
460
461impl<MessageOrigin> Default for BookState<MessageOrigin> {
462	fn default() -> Self {
463		Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
464	}
465}
466
467impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
468	fn from(book: BookState<MessageOrigin>) -> Self {
469		QueueFootprint {
470			pages: book.count,
471			ready_pages: book.end.defensive_saturating_sub(book.begin),
472			storage: Footprint { count: book.message_count, size: book.size },
473		}
474	}
475}
476
477/// Handler code for when the items in a queue change.
478pub trait OnQueueChanged<Id> {
479	/// Note that the queue `id` now has `item_count` items in it, taking up `items_size` bytes.
480	fn on_queue_changed(id: Id, fp: QueueFootprint);
481}
482
483impl<Id> OnQueueChanged<Id> for () {
484	fn on_queue_changed(_: Id, _: QueueFootprint) {}
485}
486
487/// Allows to force the processing head to a specific queue.
488pub trait ForceSetHead<O> {
489	/// Set the `ServiceHead` to `origin`.
490	///
491	/// This function:
492	/// - `Err`: Queue did not exist, not enough weight or other error.
493	/// - `Ok(true)`: The service head was updated.
494	/// - `Ok(false)`: The service head was not updated since the queue is empty.
495	fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
496}
497
498#[pezframe_support::pezpallet]
499pub mod pezpallet {
500	use super::*;
501
502	#[pezpallet::pezpallet]
503	pub struct Pezpallet<T>(_);
504
505	/// The module configuration trait.
506	#[pezpallet::config]
507	pub trait Config: pezframe_system::Config {
508		/// The overarching event type.
509		#[allow(deprecated)]
510		type RuntimeEvent: From<Event<Self>>
511			+ IsType<<Self as pezframe_system::Config>::RuntimeEvent>;
512
513		/// Weight information for extrinsics in this pezpallet.
514		type WeightInfo: WeightInfo;
515
516		/// Processor for a message.
517		///
518		/// Storage changes are not rolled back on error.
519		///
520		/// # Benchmarking
521		///
522		/// Must be set to [`mock_helpers::NoopMessageProcessor`] for benchmarking.
523		/// Other message processors that consumes exactly (1, 1) weight for any give message will
524		/// work as well. Otherwise the benchmarking will also measure the weight of the message
525		/// processor, which is not desired.
526		type MessageProcessor: ProcessMessage;
527
528		/// Page/heap size type.
529		type Size: BaseArithmetic
530			+ Unsigned
531			+ Copy
532			+ Into<u32>
533			+ Member
534			+ Encode
535			+ Decode
536			+ MaxEncodedLen
537			+ ConstEncodedLen
538			+ TypeInfo
539			+ Default;
540
541		/// Code to be called when a message queue changes - either with items introduced or
542		/// removed.
543		type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
544
545		/// Queried by the pezpallet to check whether a queue can be serviced.
546		///
547		/// This also applies to manual servicing via `execute_overweight` and `service_queues`. The
548		/// value of this is only polled once before servicing the queue. This means that changes to
549		/// it that happen *within* the servicing will not be reflected.
550		type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
551
552		/// The size of the page; this implies the maximum message size which can be sent.
553		///
554		/// A good value depends on the expected message sizes, their weights, the weight that is
555		/// available for processing them and the maximal needed message size. The maximal message
556		/// size is slightly lower than this as defined by [`MaxMessageLenOf`].
557		#[pezpallet::constant]
558		type HeapSize: Get<Self::Size>;
559
560		/// The maximum number of stale pages (i.e. of overweight messages) allowed before culling
561		/// can happen. Once there are more stale pages than this, then historical pages may be
562		/// dropped, even if they contain unprocessed overweight messages.
563		#[pezpallet::constant]
564		type MaxStale: Get<u32>;
565
566		/// The amount of weight (if any) which should be provided to the message queue for
567		/// servicing enqueued items `on_initialize`.
568		///
569		/// This may be legitimately `None` in the case that you will call
570		/// `ServiceQueues::service_queues` manually or set [`Self::IdleMaxServiceWeight`] to have
571		/// it run in `on_idle`.
572		#[pezpallet::constant]
573		type ServiceWeight: Get<Option<Weight>>;
574
575		/// The maximum amount of weight (if any) to be used from remaining weight `on_idle` which
576		/// should be provided to the message queue for servicing enqueued items `on_idle`.
577		/// Useful for teyrchains to process messages at the same block they are received.
578		///
579		/// If `None`, it will not call `ServiceQueues::service_queues` in `on_idle`.
580		#[pezpallet::constant]
581		type IdleMaxServiceWeight: Get<Option<Weight>>;
582	}
583
584	#[pezpallet::event]
585	#[pezpallet::generate_deposit(pub(super) fn deposit_event)]
586	pub enum Event<T: Config> {
587		/// Message discarded due to an error in the `MessageProcessor` (usually a format error).
588		ProcessingFailed {
589			/// The `blake2_256` hash of the message.
590			id: H256,
591			/// The queue of the message.
592			origin: MessageOriginOf<T>,
593			/// The error that occurred.
594			///
595			/// This error is pretty opaque. More fine-grained errors need to be emitted as events
596			/// by the `MessageProcessor`.
597			error: ProcessMessageError,
598		},
599		/// Message is processed.
600		Processed {
601			/// The `blake2_256` hash of the message.
602			id: H256,
603			/// The queue of the message.
604			origin: MessageOriginOf<T>,
605			/// How much weight was used to process the message.
606			weight_used: Weight,
607			/// Whether the message was processed.
608			///
609			/// Note that this does not mean that the underlying `MessageProcessor` was internally
610			/// successful. It *solely* means that the MQ pezpallet will treat this as a success
611			/// condition and discard the message. Any internal error needs to be emitted as events
612			/// by the `MessageProcessor`.
613			success: bool,
614		},
615		/// Message placed in overweight queue.
616		OverweightEnqueued {
617			/// The `blake2_256` hash of the message.
618			id: [u8; 32],
619			/// The queue of the message.
620			origin: MessageOriginOf<T>,
621			/// The page of the message.
622			page_index: PageIndex,
623			/// The index of the message within the page.
624			message_index: T::Size,
625		},
626		/// This page was reaped.
627		PageReaped {
628			/// The queue of the page.
629			origin: MessageOriginOf<T>,
630			/// The index of the page.
631			index: PageIndex,
632		},
633	}
634
635	#[pezpallet::error]
636	pub enum Error<T> {
637		/// Page is not reapable because it has items remaining to be processed and is not old
638		/// enough.
639		NotReapable,
640		/// Page to be reaped does not exist.
641		NoPage,
642		/// The referenced message could not be found.
643		NoMessage,
644		/// The message was already processed and cannot be processed again.
645		AlreadyProcessed,
646		/// The message is queued for future execution.
647		Queued,
648		/// There is temporarily not enough weight to continue servicing messages.
649		InsufficientWeight,
650		/// This message is temporarily unprocessable.
651		///
652		/// Such errors are expected, but not guaranteed, to resolve themselves eventually through
653		/// retrying.
654		TemporarilyUnprocessable,
655		/// The queue is paused and no message can be executed from it.
656		///
657		/// This can change at any time and may resolve in the future by re-trying.
658		QueuePaused,
659		/// Another call is in progress and needs to finish before this call can happen.
660		RecursiveDisallowed,
661	}
662
663	/// The index of the first and last (non-empty) pages.
664	#[pezpallet::storage]
665	pub type BookStateFor<T: Config> =
666		StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
667
668	/// The origin at which we should begin servicing.
669	#[pezpallet::storage]
670	pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
671
672	/// The map of page indices to pages.
673	#[pezpallet::storage]
674	pub type Pages<T: Config> = StorageDoubleMap<
675		_,
676		Twox64Concat,
677		MessageOriginOf<T>,
678		Twox64Concat,
679		PageIndex,
680		Page<T::Size, T::HeapSize>,
681		OptionQuery,
682	>;
683
684	#[pezpallet::hooks]
685	impl<T: Config> Hooks<BlockNumberFor<T>> for Pezpallet<T> {
686		fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
687			if let Some(weight_limit) = T::ServiceWeight::get() {
688				Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
689			} else {
690				Weight::zero()
691			}
692		}
693
694		fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
695			if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
696				// Make use of the remaining weight to process enqueued messages.
697				Self::service_queues_impl(
698					weight_limit.min(remaining_weight),
699					ServiceQueuesContext::OnIdle,
700				)
701			} else {
702				Weight::zero()
703			}
704		}
705
706		#[cfg(feature = "try-runtime")]
707		fn try_state(_: BlockNumberFor<T>) -> Result<(), pezsp_runtime::TryRuntimeError> {
708			Self::do_try_state()
709		}
710
711		/// Check all compile-time assumptions about [`crate::Config`].
712		#[cfg(test)]
713		fn integrity_test() {
714			Self::do_integrity_test().expect("Pezpallet config is valid; qed")
715		}
716	}
717
718	#[pezpallet::call]
719	impl<T: Config> Pezpallet<T> {
720		/// Remove a page which has no more messages remaining to be processed or is stale.
721		#[pezpallet::call_index(0)]
722		#[pezpallet::weight(T::WeightInfo::reap_page())]
723		pub fn reap_page(
724			origin: OriginFor<T>,
725			message_origin: MessageOriginOf<T>,
726			page_index: PageIndex,
727		) -> DispatchResult {
728			ensure_signed(origin)?;
729			Self::do_reap_page(&message_origin, page_index)
730		}
731
732		/// Execute an overweight message.
733		///
734		/// Temporary processing errors will be propagated whereas permanent errors are treated
735		/// as success condition.
736		///
737		/// - `origin`: Must be `Signed`.
738		/// - `message_origin`: The origin from which the message to be executed arrived.
739		/// - `page`: The page in the queue in which the message to be executed is sitting.
740		/// - `index`: The index into the queue of the message to be executed.
741		/// - `weight_limit`: The maximum amount of weight allowed to be consumed in the execution
742		///   of the message.
743		///
744		/// Benchmark complexity considerations: O(index + weight_limit).
745		#[pezpallet::call_index(1)]
746		#[pezpallet::weight(
747			T::WeightInfo::execute_overweight_page_updated().max(
748			T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
749		)]
750		pub fn execute_overweight(
751			origin: OriginFor<T>,
752			message_origin: MessageOriginOf<T>,
753			page: PageIndex,
754			index: T::Size,
755			weight_limit: Weight,
756		) -> DispatchResultWithPostInfo {
757			ensure_signed(origin)?;
758			let actual_weight =
759				Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
760			Ok(Some(actual_weight).into())
761		}
762	}
763}
764
765/// The status of a page after trying to execute its next message.
766#[derive(PartialEq, Debug)]
767enum PageExecutionStatus {
768	/// The execution bailed because there was not enough weight remaining.
769	Bailed,
770	/// The page did not make any progress on its execution.
771	///
772	/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
773	NoProgress,
774	/// No more messages could be loaded. This does _not_ imply `page.is_complete()`.
775	///
776	/// The reasons for this status are:
777	///  - The end of the page is reached but there could still be skipped messages.
778	///  - The storage is corrupted.
779	NoMore,
780}
781
782/// The status after trying to execute the next item of a [`Page`].
783#[derive(PartialEq, Debug)]
784enum ItemExecutionStatus {
785	/// The execution bailed because there was not enough weight remaining.
786	Bailed,
787	/// The item did not make any progress on its execution.
788	///
789	/// This is a transient condition and can be handled by retrying - exactly like [Bailed].
790	NoProgress,
791	/// The item was not found.
792	NoItem,
793	/// Whether the execution of an item resulted in it being processed.
794	///
795	/// One reason for `false` would be permanently overweight.
796	Executed(bool),
797}
798
799/// The status of an attempt to process a message.
800#[derive(PartialEq)]
801enum MessageExecutionStatus {
802	/// There is not enough weight remaining at present.
803	InsufficientWeight,
804	/// There will never be enough weight.
805	Overweight,
806	/// The message was processed successfully.
807	Processed,
808	/// The message was processed and resulted in a, possibly permanent, error.
809	Unprocessable { permanent: bool },
810	/// The stack depth limit was reached.
811	///
812	/// We cannot just return `Unprocessable` in this case, because the processability of the
813	/// message depends on how the function was called. This may be a permanent error if it was
814	/// called by a top-level function, or a transient error if it was already called in a nested
815	/// function.
816	StackLimitReached,
817}
818
819/// The context to pass to [`Pezpallet::service_queues_impl`] through on_idle and on_initialize
820/// hooks We don't want to throw the defensive message if called from on_idle hook
821#[derive(PartialEq)]
822enum ServiceQueuesContext {
823	/// Context of on_idle hook.
824	OnIdle,
825	/// Context of on_initialize hook.
826	OnInitialize,
827	/// Context `service_queues` trait function.
828	ServiceQueues,
829}
830
831impl<T: Config> Pezpallet<T> {
832	/// Knit `origin` into the ready ring right at the end.
833	///
834	/// Return the two ready ring neighbours of `origin`.
835	fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
836		if let Some(head) = ServiceHead::<T>::get() {
837			let mut head_book_state = BookStateFor::<T>::get(&head);
838			let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
839			let tail = head_neighbours.prev;
840			head_neighbours.prev = origin.clone();
841			head_book_state.ready_neighbours = Some(head_neighbours);
842			BookStateFor::<T>::insert(&head, head_book_state);
843
844			let mut tail_book_state = BookStateFor::<T>::get(&tail);
845			let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
846			tail_neighbours.next = origin.clone();
847			tail_book_state.ready_neighbours = Some(tail_neighbours);
848			BookStateFor::<T>::insert(&tail, tail_book_state);
849
850			Ok(Neighbours { next: head, prev: tail })
851		} else {
852			ServiceHead::<T>::put(origin);
853			Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
854		}
855	}
856
857	fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
858		if origin == &neighbours.next {
859			debug_assert!(
860				origin == &neighbours.prev,
861				"unknitting from single item ring; outgoing must be only item"
862			);
863			// Service queue empty.
864			ServiceHead::<T>::kill();
865		} else {
866			BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
867				if let Some(ref mut n) = book_state.ready_neighbours {
868					n.prev = neighbours.prev.clone()
869				}
870			});
871			BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
872				if let Some(ref mut n) = book_state.ready_neighbours {
873					n.next = neighbours.next.clone()
874				}
875			});
876			if let Some(head) = ServiceHead::<T>::get() {
877				if &head == origin {
878					ServiceHead::<T>::put(neighbours.next);
879				}
880			} else {
881				defensive!("`ServiceHead` must be some if there was a ready queue");
882			}
883		}
884	}
885
886	/// Tries to bump the current `ServiceHead` to the next ready queue.
887	///
888	/// Returns the current head if it got be bumped and `None` otherwise.
889	fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
890		if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
891			return None;
892		}
893
894		if let Some(head) = ServiceHead::<T>::get() {
895			let mut head_book_state = BookStateFor::<T>::get(&head);
896			if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
897				ServiceHead::<T>::put(&head_neighbours.next);
898				Some(head)
899			} else {
900				defensive!("The head must point to a queue in the ready ring");
901				None
902			}
903		} else {
904			None
905		}
906	}
907
908	fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
909		if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
910			return Err(());
911		}
912
913		// Ensure that we never set the head to an un-ready queue.
914		if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
915			ServiceHead::<T>::put(queue);
916			Ok(true)
917		} else {
918			Ok(false)
919		}
920	}
921
922	/// The maximal weight that a single message ever can consume.
923	///
924	/// Any message using more than this will be marked as permanently overweight and not
925	/// automatically re-attempted. Returns `None` if the servicing of a message cannot begin.
926	/// `Some(0)` means that only messages with no weight may be served.
927	fn max_message_weight(limit: Weight) -> Option<Weight> {
928		let service_weight = T::ServiceWeight::get().unwrap_or_default();
929		let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
930
931		// Whatever weight is set, the one with the biggest one is used as the maximum weight. If a
932		// message is tried in one context and fails, it will be retried in the other context later.
933		let max_message_weight =
934			if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
935
936		if max_message_weight.is_zero() {
937			// If no service weight is set, we need to use the given limit as max message weight.
938			limit.checked_sub(&Self::single_msg_overhead())
939		} else {
940			max_message_weight.checked_sub(&Self::single_msg_overhead())
941		}
942	}
943
944	/// The overhead of servicing a single message.
945	fn single_msg_overhead() -> Weight {
946		T::WeightInfo::bump_service_head()
947			.saturating_add(T::WeightInfo::service_queue_base())
948			.saturating_add(
949				T::WeightInfo::service_page_base_completion()
950					.max(T::WeightInfo::service_page_base_no_completion()),
951			)
952			.saturating_add(T::WeightInfo::service_page_item())
953			.saturating_add(T::WeightInfo::ready_ring_unknit())
954	}
955
956	/// Checks invariants of the pezpallet config.
957	///
958	/// The results of this can only be relied upon if the config values are set to constants.
959	#[cfg(test)]
960	fn do_integrity_test() -> Result<(), String> {
961		ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
962
963		let max_block = T::BlockWeights::get().max_block;
964
965		if let Some(service) = T::ServiceWeight::get() {
966			if Self::max_message_weight(service).is_none() {
967				return Err(format!(
968					"ServiceWeight too low: {}. Must be at least {}",
969					service,
970					Self::single_msg_overhead(),
971				));
972			}
973
974			if service.any_gt(max_block) {
975				return Err(format!(
976					"ServiceWeight {service} is bigger than max block weight {max_block}"
977				));
978			}
979		}
980
981		if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
982			if on_idle.any_gt(max_block) {
983				return Err(format!(
984					"IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
985				));
986			}
987		}
988
989		if let (Some(service_weight), Some(on_idle)) =
990			(T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
991		{
992			if !(service_weight.all_gt(on_idle)
993				|| on_idle.all_gt(service_weight)
994				|| service_weight == on_idle)
995			{
996				return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into());
997			}
998		}
999
1000		Ok(())
1001	}
1002
1003	fn do_enqueue_messages<'a>(
1004		origin: &MessageOriginOf<T>,
1005		messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
1006	) {
1007		let mut book_state = BookStateFor::<T>::get(origin);
1008
1009		let mut maybe_page = None;
1010		// Check if we already have a page in progress.
1011		if book_state.end > book_state.begin {
1012			debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
1013			maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
1014				defensive!("Corruption: referenced page doesn't exist.");
1015				None
1016			});
1017		}
1018
1019		for message in messages {
1020			// Try to append the message to the current page if possible.
1021			if let Some(mut page) = maybe_page {
1022				maybe_page = match page.try_append_message::<T>(message) {
1023					Ok(_) => Some(page),
1024					Err(_) => {
1025						// Not enough space on the current page.
1026						// Let's save it, since we'll move to a new one.
1027						Pages::<T>::insert(origin, book_state.end - 1, page);
1028						None
1029					},
1030				}
1031			}
1032			// If not, add it to a new page.
1033			if maybe_page.is_none() {
1034				book_state.end.saturating_inc();
1035				book_state.count.saturating_inc();
1036				maybe_page = Some(Page::from_message::<T>(message));
1037			}
1038
1039			// Account for the message that we just added.
1040			book_state.message_count.saturating_inc();
1041			book_state
1042				.size
1043				// This should be payload size, but here the payload *is* the message.
1044				.saturating_accrue(message.len() as u64);
1045		}
1046
1047		// Save the last page that we created.
1048		if let Some(page) = maybe_page {
1049			Pages::<T>::insert(origin, book_state.end - 1, page);
1050		}
1051
1052		// Insert book state for current origin into the ready queue.
1053		if book_state.ready_neighbours.is_none() {
1054			match Self::ready_ring_knit(origin) {
1055				Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
1056				Err(()) => {
1057					defensive!("Ring state invalid when knitting");
1058				},
1059			}
1060		}
1061
1062		// NOTE: `T::QueueChangeHandler` is called by the caller.
1063		BookStateFor::<T>::insert(origin, book_state);
1064	}
1065
1066	/// Try to execute a single message that was marked as overweight.
1067	///
1068	/// The `weight_limit` is the weight that can be consumed to execute the message. The base
1069	/// weight of the function it self must be measured by the caller.
1070	pub fn do_execute_overweight(
1071		origin: MessageOriginOf<T>,
1072		page_index: PageIndex,
1073		index: T::Size,
1074		weight_limit: Weight,
1075	) -> Result<Weight, Error<T>> {
1076		match with_service_mutex(|| {
1077			Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
1078		}) {
1079			Err(()) => Err(Error::<T>::RecursiveDisallowed),
1080			Ok(x) => x,
1081		}
1082	}
1083
1084	/// Same as `do_execute_overweight` but must be called while holding the `service_mutex`.
1085	fn do_execute_overweight_inner(
1086		origin: MessageOriginOf<T>,
1087		page_index: PageIndex,
1088		index: T::Size,
1089		weight_limit: Weight,
1090	) -> Result<Weight, Error<T>> {
1091		let mut book_state = BookStateFor::<T>::get(&origin);
1092		ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
1093
1094		let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
1095		let (pos, is_processed, payload) =
1096			page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
1097		let payload_len = payload.len() as u64;
1098		ensure!(
1099			page_index < book_state.begin
1100				|| (page_index == book_state.begin && pos < page.first.into() as usize),
1101			Error::<T>::Queued
1102		);
1103		ensure!(!is_processed, Error::<T>::AlreadyProcessed);
1104		use MessageExecutionStatus::*;
1105		let mut weight_counter = WeightMeter::with_limit(weight_limit);
1106		match Self::process_message_payload(
1107			origin.clone(),
1108			page_index,
1109			index,
1110			payload,
1111			&mut weight_counter,
1112			Weight::MAX,
1113			// ^^^ We never recognise it as permanently overweight, since that would result in an
1114			// additional overweight event being deposited.
1115		) {
1116			Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
1117			StackLimitReached | Unprocessable { permanent: false } => {
1118				Err(Error::<T>::TemporarilyUnprocessable)
1119			},
1120			Unprocessable { permanent: true } | Processed => {
1121				page.note_processed_at_pos(pos);
1122				book_state.message_count.saturating_dec();
1123				book_state.size.saturating_reduce(payload_len);
1124				let page_weight = if page.remaining.is_zero() {
1125					debug_assert!(
1126						page.remaining_size.is_zero(),
1127						"no messages remaining; no space taken; qed"
1128					);
1129					Pages::<T>::remove(&origin, page_index);
1130					debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
1131					book_state.count.saturating_dec();
1132					T::WeightInfo::execute_overweight_page_removed()
1133				// no need to consider .first or ready ring since processing an overweight page
1134				// would not alter that state.
1135				} else {
1136					Pages::<T>::insert(&origin, page_index, page);
1137					T::WeightInfo::execute_overweight_page_updated()
1138				};
1139				BookStateFor::<T>::insert(&origin, &book_state);
1140				T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1141				Ok(weight_counter.consumed().saturating_add(page_weight))
1142			},
1143		}
1144	}
1145
1146	/// Remove a stale page or one which has no more messages remaining to be processed.
1147	fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1148		match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
1149			Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
1150			Ok(x) => x,
1151		}
1152	}
1153
1154	/// Same as `do_reap_page` but must be called while holding the `service_mutex`.
1155	fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
1156		let mut book_state = BookStateFor::<T>::get(origin);
1157		// definitely not reapable if the page's index is no less than the `begin`ning of ready
1158		// pages.
1159		ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
1160
1161		let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
1162
1163		// definitely reapable if the page has no messages in it.
1164		let reapable = page.remaining.is_zero();
1165
1166		// also reapable if the page index has dropped below our watermark.
1167		let cullable = || {
1168			let total_pages = book_state.count;
1169			let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
1170
1171			// The number of stale pages - i.e. pages which contain unprocessed overweight messages.
1172			// We would prefer to keep these around but will restrict how far into history they can
1173			// extend if we notice that there's too many of them.
1174			//
1175			// We don't know *where* in history these pages are so we use a dynamic formula which
1176			// reduces the historical time horizon as the stale pages pile up and increases it as
1177			// they reduce.
1178			let stale_pages = total_pages - ready_pages;
1179
1180			// The maximum number of stale pages (i.e. of overweight messages) allowed before
1181			// culling can happen at all. Once there are more stale pages than this, then historical
1182			// pages may be dropped, even if they contain unprocessed overweight messages.
1183			let max_stale = T::MaxStale::get();
1184
1185			// The amount beyond the maximum which are being used. If it's not beyond the maximum
1186			// then we exit now since no culling is needed.
1187			let overflow = match stale_pages.checked_sub(max_stale + 1) {
1188				Some(x) => x + 1,
1189				None => return false,
1190			};
1191
1192			// The special formula which tells us how deep into index-history we will pages. As
1193			// the overflow is greater (and thus the need to drop items from storage is more urgent)
1194			// this is reduced, allowing a greater range of pages to be culled.
1195			// With a minimum `overflow` (`1`), this returns `max_stale ** 2`, indicating we only
1196			// cull beyond that number of indices deep into history.
1197			// At this overflow increases, our depth reduces down to a limit of `max_stale`. We
1198			// never want to reduce below this since this will certainly allow enough pages to be
1199			// culled in order to bring `overflow` back to zero.
1200			let backlog = (max_stale * max_stale / overflow).max(max_stale);
1201
1202			let watermark = book_state.begin.saturating_sub(backlog);
1203			page_index < watermark
1204		};
1205		ensure!(reapable || cullable(), Error::<T>::NotReapable);
1206
1207		Pages::<T>::remove(origin, page_index);
1208		debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
1209		book_state.count.saturating_dec();
1210		book_state.message_count.saturating_reduce(page.remaining.into() as u64);
1211		book_state.size.saturating_reduce(page.remaining_size.into() as u64);
1212		BookStateFor::<T>::insert(origin, &book_state);
1213		T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
1214		Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
1215
1216		Ok(())
1217	}
1218
1219	/// Execute any messages remaining to be processed in the queue of `origin`, using up to
1220	/// `weight_limit` to do so. Any messages which would take more than `overweight_limit` to
1221	/// execute are deemed overweight and ignored.
1222	fn service_queue(
1223		origin: MessageOriginOf<T>,
1224		weight: &mut WeightMeter,
1225		overweight_limit: Weight,
1226	) -> (bool, Option<MessageOriginOf<T>>) {
1227		use PageExecutionStatus::*;
1228		if weight
1229			.try_consume(
1230				T::WeightInfo::service_queue_base()
1231					.saturating_add(T::WeightInfo::ready_ring_unknit()),
1232			)
1233			.is_err()
1234		{
1235			return (false, None);
1236		}
1237
1238		let mut book_state = BookStateFor::<T>::get(&origin);
1239		let mut total_processed = 0;
1240		if T::QueuePausedQuery::is_paused(&origin) {
1241			let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1242			return (false, next_ready);
1243		}
1244
1245		while book_state.end > book_state.begin {
1246			let (processed, status) =
1247				Self::service_page(&origin, &mut book_state, weight, overweight_limit);
1248			total_processed.saturating_accrue(processed);
1249			match status {
1250				// Store the page progress and do not go to the next one.
1251				Bailed | NoProgress => break,
1252				// Go to the next page if this one is at the end.
1253				NoMore => (),
1254			};
1255			book_state.begin.saturating_inc();
1256		}
1257		let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
1258		if book_state.begin >= book_state.end {
1259			// No longer ready - unknit.
1260			if let Some(neighbours) = book_state.ready_neighbours.take() {
1261				Self::ready_ring_unknit(&origin, neighbours);
1262			} else if total_processed > 0 {
1263				defensive!("Freshly processed queue must have been ready");
1264			}
1265		}
1266		BookStateFor::<T>::insert(&origin, &book_state);
1267		if total_processed > 0 {
1268			T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1269		}
1270		(total_processed > 0, next_ready)
1271	}
1272
1273	/// Service as many messages of a page as possible.
1274	///
1275	/// Returns how many messages were processed and the page's status.
1276	fn service_page(
1277		origin: &MessageOriginOf<T>,
1278		book_state: &mut BookStateOf<T>,
1279		weight: &mut WeightMeter,
1280		overweight_limit: Weight,
1281	) -> (u32, PageExecutionStatus) {
1282		use PageExecutionStatus::*;
1283		if weight
1284			.try_consume(
1285				T::WeightInfo::service_page_base_completion()
1286					.max(T::WeightInfo::service_page_base_no_completion()),
1287			)
1288			.is_err()
1289		{
1290			return (0, Bailed);
1291		}
1292
1293		let page_index = book_state.begin;
1294		let mut page = match Pages::<T>::get(origin, page_index) {
1295			Some(p) => p,
1296			None => {
1297				defensive!("message-queue: referenced page not found");
1298				return (0, NoMore);
1299			},
1300		};
1301
1302		let mut total_processed = 0;
1303
1304		// Execute as many messages as possible.
1305		let status = loop {
1306			use ItemExecutionStatus::*;
1307			match Self::service_page_item(
1308				origin,
1309				page_index,
1310				book_state,
1311				&mut page,
1312				weight,
1313				overweight_limit,
1314			) {
1315				Bailed => break PageExecutionStatus::Bailed,
1316				NoItem => break PageExecutionStatus::NoMore,
1317				NoProgress => break PageExecutionStatus::NoProgress,
1318				// Keep going as long as we make progress...
1319				Executed(true) => total_processed.saturating_inc(),
1320				Executed(false) => (),
1321			}
1322		};
1323
1324		if page.is_complete() {
1325			debug_assert!(status != Bailed, "we never bail if a page became complete");
1326			Pages::<T>::remove(origin, page_index);
1327			debug_assert!(book_state.count > 0, "completing a page implies there are pages");
1328			book_state.count.saturating_dec();
1329		} else {
1330			Pages::<T>::insert(origin, page_index, page);
1331		}
1332		(total_processed, status)
1333	}
1334
1335	/// Execute the next message of a page.
1336	pub(crate) fn service_page_item(
1337		origin: &MessageOriginOf<T>,
1338		page_index: PageIndex,
1339		book_state: &mut BookStateOf<T>,
1340		page: &mut PageOf<T>,
1341		weight: &mut WeightMeter,
1342		overweight_limit: Weight,
1343	) -> ItemExecutionStatus {
1344		use MessageExecutionStatus::*;
1345		// This ugly pre-checking is needed for the invariant
1346		// "we never bail if a page became complete".
1347		if page.is_complete() {
1348			return ItemExecutionStatus::NoItem;
1349		}
1350		if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
1351			return ItemExecutionStatus::Bailed;
1352		}
1353
1354		let payload = &match page.peek_first() {
1355			Some(m) => m,
1356			None => return ItemExecutionStatus::NoItem,
1357		}[..];
1358		let payload_len = payload.len() as u64;
1359
1360		// Store these for the case that `process_message_payload` is recursive.
1361		Pages::<T>::insert(origin, page_index, &*page);
1362		BookStateFor::<T>::insert(origin, &*book_state);
1363
1364		let res = Self::process_message_payload(
1365			origin.clone(),
1366			page_index,
1367			page.first_index,
1368			payload,
1369			weight,
1370			overweight_limit,
1371		);
1372
1373		// And restore them afterwards to see the changes of a recursive call.
1374		*book_state = BookStateFor::<T>::get(origin);
1375		if let Some(new_page) = Pages::<T>::get(origin, page_index) {
1376			*page = new_page;
1377		} else {
1378			defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
1379			return ItemExecutionStatus::NoItem;
1380		};
1381
1382		let is_processed = match res {
1383			InsufficientWeight => return ItemExecutionStatus::Bailed,
1384			Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
1385			Processed | Unprocessable { permanent: true } | StackLimitReached => true,
1386			Overweight => false,
1387		};
1388
1389		if is_processed {
1390			book_state.message_count.saturating_dec();
1391			book_state.size.saturating_reduce(payload_len as u64);
1392		}
1393		page.skip_first(is_processed);
1394		ItemExecutionStatus::Executed(is_processed)
1395	}
1396
1397	/// Ensure the correctness of state of this pezpallet.
1398	///
1399	/// # Assumptions-
1400	///
1401	/// If `serviceHead` points to a ready Queue, then BookState of that Queue has:
1402	///
1403	/// * `message_count` > 0
1404	/// * `size` > 0
1405	/// * `end` > `begin`
1406	/// * Some(ready_neighbours)
1407	/// * If `ready_neighbours.next` == self.origin, then `ready_neighbours.prev` == self.origin
1408	///   (only queue in ring)
1409	///
1410	/// For Pages(begin to end-1) in BookState:
1411	///
1412	/// * `remaining` > 0
1413	/// * `remaining_size` > 0
1414	/// * `first` <= `last`
1415	/// * Every page can be decoded into peek_* functions
1416	#[cfg(any(test, feature = "try-runtime", feature = "std"))]
1417	pub fn do_try_state() -> Result<(), pezsp_runtime::TryRuntimeError> {
1418		// Checking memory corruption for BookStateFor
1419		ensure!(
1420			BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
1421			"Memory Corruption in BookStateFor"
1422		);
1423		// Checking memory corruption for Pages
1424		ensure!(
1425			Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
1426			"Memory Corruption in Pages"
1427		);
1428
1429		// Basic checks for each book
1430		for book in BookStateFor::<T>::iter_values() {
1431			ensure!(book.end >= book.begin, "Invariant");
1432			ensure!(book.end < 1 << 30, "Likely overflow or corruption");
1433			ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
1434			ensure!(book.size < 1 << 30, "Likely overflow or corruption");
1435			ensure!(book.count < 1 << 30, "Likely overflow or corruption");
1436
1437			let fp: QueueFootprint = book.into();
1438			ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
1439		}
1440
1441		//loop around this origin
1442		let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
1443
1444		while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
1445			ensure!(
1446				BookStateFor::<T>::contains_key(&head),
1447				"Service head must point to an existing book"
1448			);
1449
1450			let head_book_state = BookStateFor::<T>::get(&head);
1451			ensure!(
1452				head_book_state.message_count > 0,
1453				"There must be some messages if in ReadyRing"
1454			);
1455			ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
1456			ensure!(
1457				head_book_state.end > head_book_state.begin,
1458				"End > Begin if unprocessed messages exists"
1459			);
1460			ensure!(
1461				head_book_state.ready_neighbours.is_some(),
1462				"There must be neighbours if in ReadyRing"
1463			);
1464
1465			if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
1466				ensure!(
1467					head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
1468					"Can only happen if only queue in ReadyRing"
1469				);
1470			}
1471
1472			for page_index in head_book_state.begin..head_book_state.end {
1473				let page = Pages::<T>::get(&head, page_index).unwrap();
1474				let remaining_messages = page.remaining;
1475				let mut counted_remaining_messages: u32 = 0;
1476				ensure!(
1477					remaining_messages > 0.into(),
1478					"These must be some messages that have not been processed yet!"
1479				);
1480
1481				for i in 0..u32::MAX {
1482					if let Some((_, processed, _)) = page.peek_index(i as usize) {
1483						if !processed {
1484							counted_remaining_messages += 1;
1485						}
1486					} else {
1487						break;
1488					}
1489				}
1490
1491				ensure!(
1492					remaining_messages.into() == counted_remaining_messages,
1493					"Memory Corruption"
1494				);
1495			}
1496
1497			if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
1498				break;
1499			}
1500		}
1501		Ok(())
1502	}
1503
1504	/// Print the pages in each queue and the messages in each page.
1505	///
1506	/// Processed messages are prefixed with a `*` and the current `begin`ning page with a `>`.
1507	///
1508	/// # Example output
1509	///
1510	/// ```text
1511	/// queue Here:
1512	///   page 0: []
1513	/// > page 1: []
1514	///   page 2: ["\0weight=4", "\0c", ]
1515	///   page 3: ["\0bigbig 1", ]
1516	///   page 4: ["\0bigbig 2", ]
1517	///   page 5: ["\0bigbig 3", ]
1518	/// ```
1519	#[cfg(feature = "std")]
1520	pub fn debug_info() -> String {
1521		let mut info = String::new();
1522		for (origin, book_state) in BookStateFor::<T>::iter() {
1523			let mut queue = format!("queue {:?}:\n", &origin);
1524			let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
1525			pages.sort_by(|(a, _), (b, _)| a.cmp(b));
1526			for (page_index, mut page) in pages.into_iter() {
1527				let page_info = if book_state.begin == page_index { ">" } else { " " };
1528				let mut page_info = format!(
1529					"{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
1530					page_info, page_index, page.first, page.last, page.remaining
1531				);
1532				for i in 0..u32::MAX {
1533					if let Some((_, processed, message)) =
1534						page.peek_index(i.try_into().expect("std-only code"))
1535					{
1536						let msg = String::from_utf8_lossy(message);
1537						if processed {
1538							page_info.push('*');
1539						}
1540						page_info.push_str(&format!("{:?}, ", msg));
1541						page.skip_first(true);
1542					} else {
1543						break;
1544					}
1545				}
1546				page_info.push_str("]\n");
1547				queue.push_str(&page_info);
1548			}
1549			info.push_str(&queue);
1550		}
1551		info
1552	}
1553
1554	/// Process a single message.
1555	///
1556	/// The base weight of this function needs to be accounted for by the caller. `weight` is the
1557	/// remaining weight to process the message. `overweight_limit` is the maximum weight that a
1558	/// message can ever consume. Messages above this limit are marked as permanently overweight.
1559	/// This process is also transactional, any form of error that occurs in processing a message
1560	/// causes storage changes to be rolled back.
1561	fn process_message_payload(
1562		origin: MessageOriginOf<T>,
1563		page_index: PageIndex,
1564		message_index: T::Size,
1565		message: &[u8],
1566		meter: &mut WeightMeter,
1567		overweight_limit: Weight,
1568	) -> MessageExecutionStatus {
1569		let mut id = pezsp_io::hashing::blake2_256(message);
1570		use ProcessMessageError::*;
1571		let prev_consumed = meter.consumed();
1572
1573		let transaction =
1574			storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
1575				let res =
1576					T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
1577				match &res {
1578					Ok(_) => TransactionOutcome::Commit(Ok(res)),
1579					Err(_) => TransactionOutcome::Rollback(Ok(res)),
1580				}
1581			});
1582
1583		let transaction = match transaction {
1584			Ok(result) => result,
1585			_ => {
1586				defensive!(
1587					"Error occurred processing message, storage changes will be rolled back"
1588				);
1589				return MessageExecutionStatus::Unprocessable { permanent: true };
1590			},
1591		};
1592
1593		match transaction {
1594			Err(Overweight(w)) if w.any_gt(overweight_limit) => {
1595				// Permanently overweight.
1596				Self::deposit_event(Event::<T>::OverweightEnqueued {
1597					id,
1598					origin,
1599					page_index,
1600					message_index,
1601				});
1602				MessageExecutionStatus::Overweight
1603			},
1604			Err(Overweight(_)) => {
1605				// Temporarily overweight - save progress and stop processing this
1606				// queue.
1607				MessageExecutionStatus::InsufficientWeight
1608			},
1609			Err(Yield) => {
1610				// Processing should be reattempted later.
1611				MessageExecutionStatus::Unprocessable { permanent: false }
1612			},
1613			Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
1614				// Permanent error - drop
1615				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1616				MessageExecutionStatus::Unprocessable { permanent: true }
1617			},
1618			Err(error @ StackLimitReached) => {
1619				Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
1620				MessageExecutionStatus::StackLimitReached
1621			},
1622			Ok(success) => {
1623				// Success
1624				let weight_used = meter.consumed().saturating_sub(prev_consumed);
1625				Self::deposit_event(Event::<T>::Processed {
1626					id: id.into(),
1627					origin,
1628					weight_used,
1629					success,
1630				});
1631				MessageExecutionStatus::Processed
1632			},
1633		}
1634	}
1635
1636	fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
1637		let mut weight = WeightMeter::with_limit(weight_limit);
1638
1639		// Get the maximum weight that processing a single message may take:
1640		let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
1641			if matches!(context, ServiceQueuesContext::OnInitialize) {
1642				defensive!("Not enough weight to service a single message.");
1643			}
1644			Weight::zero()
1645		});
1646
1647		match with_service_mutex(|| {
1648			let mut next = match Self::bump_service_head(&mut weight) {
1649				Some(h) => h,
1650				None => return weight.consumed(),
1651			};
1652			// The last queue that did not make any progress.
1653			// The loop aborts as soon as it arrives at this queue again without making any progress
1654			// on other queues in between.
1655			let mut last_no_progress = None;
1656
1657			loop {
1658				let (progressed, n) =
1659					Self::service_queue(next.clone(), &mut weight, overweight_limit);
1660				next = match n {
1661					Some(n) => {
1662						if !progressed {
1663							if last_no_progress == Some(n.clone()) {
1664								break;
1665							}
1666							if last_no_progress.is_none() {
1667								last_no_progress = Some(next.clone())
1668							}
1669							n
1670						} else {
1671							last_no_progress = None;
1672							n
1673						}
1674					},
1675					None => break,
1676				}
1677			}
1678			weight.consumed()
1679		}) {
1680			Err(()) => weight.consumed(),
1681			Ok(w) => w,
1682		}
1683	}
1684}
1685
1686impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pezpallet<T> {
1687	fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
1688		Pezpallet::<T>::set_service_head(weight, origin)
1689	}
1690}
1691
1692/// Run a closure that errors on re-entrance. Meant to be used by anything that services queues.
1693pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
1694	// Holds the singleton token instance.
1695	environmental::environmental!(token: Option<()>);
1696
1697	token::using_once(&mut Some(()), || {
1698		// The first `ok_or` should always be `Ok` since we are inside a `using_once`.
1699		let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
1700
1701		// Put the token back when we're done.
1702		defer! {
1703			token::with(|t| {
1704				*t = Some(hold);
1705			});
1706		}
1707
1708		Ok(f())
1709	})
1710}
1711
1712/// Provides a [`pezsp_core::Get`] to access the `MEL` of a [`codec::MaxEncodedLen`] type.
1713pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
1714impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
1715	fn get() -> u32 {
1716		T::max_encoded_len() as u32
1717	}
1718}
1719
1720/// Calculates the maximum message length and exposed it through the [`codec::MaxEncodedLen`] trait.
1721pub struct MaxMessageLen<Origin, Size, HeapSize>(
1722	core::marker::PhantomData<(Origin, Size, HeapSize)>,
1723);
1724impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
1725	for MaxMessageLen<Origin, Size, HeapSize>
1726{
1727	fn get() -> u32 {
1728		(HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
1729	}
1730}
1731
1732/// The maximal message length.
1733pub type MaxMessageLenOf<T> =
1734	MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
1735/// The maximal encoded origin length.
1736pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
1737/// The `MessageOrigin` of this pezpallet.
1738pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
1739/// The maximal heap size of a page.
1740pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
1741/// The [`Page`] of this pezpallet.
1742pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
1743/// The [`BookState`] of this pezpallet.
1744pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
1745
1746/// Converts a [`pezsp_core::Get`] with returns a type that can be cast into an `u32` into a `Get`
1747/// which returns an `u32`.
1748pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
1749impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
1750	fn get() -> u32 {
1751		T::get().into()
1752	}
1753}
1754
1755impl<T: Config> ServiceQueues for Pezpallet<T> {
1756	type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
1757
1758	fn service_queues(weight_limit: Weight) -> Weight {
1759		Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
1760	}
1761
1762	/// Execute a single overweight message.
1763	///
1764	/// The weight limit must be enough for `execute_overweight` and the message execution itself.
1765	fn execute_overweight(
1766		weight_limit: Weight,
1767		(message_origin, page, index): Self::OverweightMessageAddress,
1768	) -> Result<Weight, ExecuteOverweightError> {
1769		let mut weight = WeightMeter::with_limit(weight_limit);
1770		if weight
1771			.try_consume(
1772				T::WeightInfo::execute_overweight_page_removed()
1773					.max(T::WeightInfo::execute_overweight_page_updated()),
1774			)
1775			.is_err()
1776		{
1777			return Err(ExecuteOverweightError::InsufficientWeight);
1778		}
1779
1780		Pezpallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining())
1781			.map_err(|e| match e {
1782				Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
1783				Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
1784				Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
1785				Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued => {
1786					ExecuteOverweightError::NotFound
1787				},
1788				Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
1789				_ => ExecuteOverweightError::Other,
1790			})
1791	}
1792}
1793
1794impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pezpallet<T> {
1795	type MaxMessageLen =
1796		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1797
1798	fn enqueue_message(
1799		message: BoundedSlice<u8, Self::MaxMessageLen>,
1800		origin: <T::MessageProcessor as ProcessMessage>::Origin,
1801	) {
1802		Self::do_enqueue_messages(&origin, [message].into_iter());
1803		let book_state = BookStateFor::<T>::get(&origin);
1804		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1805	}
1806
1807	fn enqueue_messages<'a>(
1808		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1809		origin: <T::MessageProcessor as ProcessMessage>::Origin,
1810	) {
1811		Self::do_enqueue_messages(&origin, messages);
1812		let book_state = BookStateFor::<T>::get(&origin);
1813		T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
1814	}
1815
1816	fn sweep_queue(origin: MessageOriginOf<T>) {
1817		if !BookStateFor::<T>::contains_key(&origin) {
1818			return;
1819		}
1820		let mut book_state = BookStateFor::<T>::get(&origin);
1821		book_state.begin = book_state.end;
1822		if let Some(neighbours) = book_state.ready_neighbours.take() {
1823			Self::ready_ring_unknit(&origin, neighbours);
1824		}
1825		BookStateFor::<T>::insert(&origin, &book_state);
1826	}
1827}
1828
1829impl<T: Config> QueueFootprintQuery<MessageOriginOf<T>> for Pezpallet<T> {
1830	type MaxMessageLen =
1831		MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
1832
1833	fn get_batches_footprints<'a>(
1834		origin: MessageOriginOf<T>,
1835		msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
1836		total_pages_limit: u32,
1837	) -> BatchesFootprints {
1838		let mut batches_footprints = BatchesFootprints::default();
1839
1840		let mut new_page = false;
1841		let mut total_pages_count = 0;
1842		let mut current_page_pos: usize = T::HeapSize::get().into() as usize;
1843
1844		let book = BookStateFor::<T>::get(&origin);
1845		if book.end > book.begin {
1846			total_pages_count = book.end - book.begin;
1847			if let Some(page) = Pages::<T>::get(origin, book.end - 1) {
1848				current_page_pos = page.heap_pos();
1849				batches_footprints.first_page_pos = current_page_pos;
1850			}
1851		}
1852
1853		let mut msgs = msgs.peekable();
1854		while let Some(msg) = msgs.peek() {
1855			if total_pages_count > total_pages_limit {
1856				return batches_footprints;
1857			}
1858
1859			match Page::<T::Size, T::HeapSize>::can_append_message_at(current_page_pos, msg.len()) {
1860				Ok(new_pos) => {
1861					current_page_pos = new_pos;
1862					batches_footprints.push(msg, new_page);
1863					new_page = false;
1864					msgs.next();
1865				},
1866				Err(_) => {
1867					// Would not fit into the current page.
1868					// We start a new one and try again in the next iteration.
1869					new_page = true;
1870					total_pages_count += 1;
1871					current_page_pos = 0;
1872				},
1873			}
1874		}
1875
1876		batches_footprints
1877	}
1878
1879	fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
1880		BookStateFor::<T>::get(&origin).into()
1881	}
1882}