#![deny(missing_docs)]
#![cfg_attr(not(feature = "std"), no_std)]
mod benchmarking;
mod integration_test;
mod mock;
pub mod mock_helpers;
mod tests;
pub mod weights;
extern crate alloc;
use alloc::vec::Vec;
use codec::{Codec, ConstEncodedLen, Decode, DecodeWithMemTracking, Encode, MaxEncodedLen};
use core::{fmt::Debug, ops::Deref};
use frame_support::{
defensive,
pallet_prelude::*,
traits::{
BatchesFootprints, Defensive, DefensiveSaturating, DefensiveTruncateFrom, EnqueueMessage,
ExecuteOverweightError, Footprint, ProcessMessage, ProcessMessageError, QueueFootprint,
QueueFootprintQuery, QueuePausedQuery, ServiceQueues,
},
BoundedSlice, CloneNoBound, DefaultNoBound,
};
use frame_system::pallet_prelude::*;
pub use pallet::*;
use scale_info::TypeInfo;
use sp_arithmetic::traits::{BaseArithmetic, Unsigned};
use sp_core::{defer, H256};
use sp_runtime::{
traits::{One, Zero},
SaturatedConversion, Saturating, TransactionOutcome,
};
use sp_weights::WeightMeter;
pub use weights::WeightInfo;
type PageIndex = u32;
#[derive(Encode, Decode, PartialEq, MaxEncodedLen, Debug)]
pub struct ItemHeader<Size> {
payload_len: Size,
is_processed: bool,
}
impl<Size: ConstEncodedLen> ConstEncodedLen for ItemHeader<Size> {}
#[derive(CloneNoBound, Encode, Decode, DebugNoBound, DefaultNoBound, TypeInfo, MaxEncodedLen)]
#[scale_info(skip_type_params(HeapSize))]
#[codec(mel_bound(Size: MaxEncodedLen))]
pub struct Page<Size: Into<u32> + Debug + Clone + Default, HeapSize: Get<Size>> {
remaining: Size,
remaining_size: Size,
first_index: Size,
first: Size,
last: Size,
heap: BoundedVec<u8, IntoU32<HeapSize, Size>>,
}
impl<
Size: BaseArithmetic + Unsigned + Copy + Into<u32> + Codec + MaxEncodedLen + Debug + Default,
HeapSize: Get<Size>,
> Page<Size, HeapSize>
where
ItemHeader<Size>: ConstEncodedLen,
{
fn from_message<T: Config>(message: BoundedSlice<u8, MaxMessageLenOf<T>>) -> Self {
let payload_len = message.len();
let data_len = ItemHeader::<Size>::max_encoded_len().saturating_add(payload_len);
let payload_len = payload_len.saturated_into();
let header = ItemHeader::<Size> { payload_len, is_processed: false };
let mut heap = Vec::with_capacity(data_len);
header.using_encoded(|h| heap.extend_from_slice(h));
heap.extend_from_slice(message.deref());
Page {
remaining: One::one(),
remaining_size: payload_len,
first_index: Zero::zero(),
first: Zero::zero(),
last: Zero::zero(),
heap: BoundedVec::defensive_truncate_from(heap),
}
}
fn heap_pos(&self) -> usize {
self.heap.len()
}
fn can_append_message_at(pos: usize, message_len: usize) -> Result<usize, ()> {
let header_size = ItemHeader::<Size>::max_encoded_len();
let data_len = header_size.saturating_add(message_len);
let heap_size = HeapSize::get().into() as usize;
let new_pos = pos.saturating_add(data_len);
if new_pos <= heap_size {
Ok(new_pos)
} else {
Err(())
}
}
fn try_append_message<T: Config>(
&mut self,
message: BoundedSlice<u8, MaxMessageLenOf<T>>,
) -> Result<(), ()> {
let pos = self.heap_pos();
Self::can_append_message_at(pos, message.len())?;
let payload_len = message.len().saturated_into();
let header = ItemHeader::<Size> { payload_len, is_processed: false };
let mut heap = core::mem::take(&mut self.heap).into_inner();
header.using_encoded(|h| heap.extend_from_slice(h));
heap.extend_from_slice(message.deref());
self.heap = BoundedVec::defensive_truncate_from(heap);
self.last = pos.saturated_into();
self.remaining.saturating_inc();
self.remaining_size.saturating_accrue(payload_len);
Ok(())
}
fn peek_first(&self) -> Option<BoundedSlice<'_, u8, IntoU32<HeapSize, Size>>> {
if self.first > self.last {
return None;
}
let f = (self.first.into() as usize).min(self.heap.len());
let mut item_slice = &self.heap[f..];
if let Ok(h) = ItemHeader::<Size>::decode(&mut item_slice) {
let payload_len = h.payload_len.into() as usize;
if payload_len <= item_slice.len() {
return Some(BoundedSlice::defensive_truncate_from(&item_slice[..payload_len]));
}
}
defensive!("message-queue: heap corruption");
None
}
fn skip_first(&mut self, is_processed: bool) {
let f = (self.first.into() as usize).min(self.heap.len());
if let Ok(mut h) = ItemHeader::decode(&mut &self.heap[f..]) {
if is_processed && !h.is_processed {
h.is_processed = true;
h.using_encoded(|d| self.heap[f..f + d.len()].copy_from_slice(d));
self.remaining.saturating_dec();
self.remaining_size.saturating_reduce(h.payload_len);
}
self.first
.saturating_accrue(ItemHeader::<Size>::max_encoded_len().saturated_into());
self.first.saturating_accrue(h.payload_len);
self.first_index.saturating_inc();
}
}
fn peek_index(&self, index: usize) -> Option<(usize, bool, &[u8])> {
let mut pos = 0;
let mut item_slice = &self.heap[..];
let header_len: usize = ItemHeader::<Size>::max_encoded_len().saturated_into();
for _ in 0..index {
let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
let item_len = h.payload_len.into() as usize;
if item_slice.len() < item_len {
return None;
}
item_slice = &item_slice[item_len..];
pos.saturating_accrue(header_len.saturating_add(item_len));
}
let h = ItemHeader::<Size>::decode(&mut item_slice).ok()?;
if item_slice.len() < h.payload_len.into() as usize {
return None;
}
item_slice = &item_slice[..h.payload_len.into() as usize];
Some((pos, h.is_processed, item_slice))
}
fn note_processed_at_pos(&mut self, pos: usize) {
if let Ok(mut h) = ItemHeader::<Size>::decode(&mut &self.heap[pos..]) {
if !h.is_processed {
h.is_processed = true;
h.using_encoded(|d| self.heap[pos..pos + d.len()].copy_from_slice(d));
self.remaining.saturating_dec();
self.remaining_size.saturating_reduce(h.payload_len);
}
}
}
fn is_complete(&self) -> bool {
self.remaining.is_zero()
}
}
#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, Debug, PartialEq)]
pub struct Neighbours<MessageOrigin> {
prev: MessageOrigin,
next: MessageOrigin,
}
#[derive(Clone, Encode, Decode, MaxEncodedLen, TypeInfo, Debug)]
pub struct BookState<MessageOrigin> {
begin: PageIndex,
end: PageIndex,
count: PageIndex,
ready_neighbours: Option<Neighbours<MessageOrigin>>,
message_count: u64,
size: u64,
}
impl<MessageOrigin> Default for BookState<MessageOrigin> {
fn default() -> Self {
Self { begin: 0, end: 0, count: 0, ready_neighbours: None, message_count: 0, size: 0 }
}
}
impl<MessageOrigin> From<BookState<MessageOrigin>> for QueueFootprint {
fn from(book: BookState<MessageOrigin>) -> Self {
QueueFootprint {
pages: book.count,
ready_pages: book.end.defensive_saturating_sub(book.begin),
storage: Footprint { count: book.message_count, size: book.size },
}
}
}
pub trait OnQueueChanged<Id> {
fn on_queue_changed(id: Id, fp: QueueFootprint);
}
impl<Id> OnQueueChanged<Id> for () {
fn on_queue_changed(_: Id, _: QueueFootprint) {}
}
pub trait ForceSetHead<O> {
fn force_set_head(weight: &mut WeightMeter, origin: &O) -> Result<bool, ()>;
}
#[frame_support::pallet]
pub mod pallet {
use super::*;
#[pallet::pallet]
pub struct Pallet<T>(_);
#[pallet::config]
pub trait Config: frame_system::Config {
#[allow(deprecated)]
type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
type WeightInfo: WeightInfo;
type MessageProcessor: ProcessMessage;
type Size: BaseArithmetic
+ Unsigned
+ Copy
+ Into<u32>
+ Member
+ Encode
+ Decode
+ MaxEncodedLen
+ ConstEncodedLen
+ TypeInfo
+ Default;
type QueueChangeHandler: OnQueueChanged<<Self::MessageProcessor as ProcessMessage>::Origin>;
type QueuePausedQuery: QueuePausedQuery<<Self::MessageProcessor as ProcessMessage>::Origin>;
#[pallet::constant]
type HeapSize: Get<Self::Size>;
#[pallet::constant]
type MaxStale: Get<u32>;
#[pallet::constant]
type ServiceWeight: Get<Option<Weight>>;
#[pallet::constant]
type IdleMaxServiceWeight: Get<Option<Weight>>;
}
#[pallet::event]
#[pallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
ProcessingFailed {
id: H256,
origin: MessageOriginOf<T>,
error: ProcessMessageError,
},
Processed {
id: H256,
origin: MessageOriginOf<T>,
weight_used: Weight,
success: bool,
},
OverweightEnqueued {
id: [u8; 32],
origin: MessageOriginOf<T>,
page_index: PageIndex,
message_index: T::Size,
},
PageReaped {
origin: MessageOriginOf<T>,
index: PageIndex,
},
}
#[pallet::error]
pub enum Error<T> {
NotReapable,
NoPage,
NoMessage,
AlreadyProcessed,
Queued,
InsufficientWeight,
TemporarilyUnprocessable,
QueuePaused,
RecursiveDisallowed,
}
#[pallet::storage]
pub type BookStateFor<T: Config> =
StorageMap<_, Twox64Concat, MessageOriginOf<T>, BookState<MessageOriginOf<T>>, ValueQuery>;
#[pallet::storage]
pub type ServiceHead<T: Config> = StorageValue<_, MessageOriginOf<T>, OptionQuery>;
#[pallet::storage]
pub type Pages<T: Config> = StorageDoubleMap<
_,
Twox64Concat,
MessageOriginOf<T>,
Twox64Concat,
PageIndex,
Page<T::Size, T::HeapSize>,
OptionQuery,
>;
#[pallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pallet<T> {
fn on_initialize(_n: BlockNumberFor<T>) -> Weight {
if let Some(weight_limit) = T::ServiceWeight::get() {
Self::service_queues_impl(weight_limit, ServiceQueuesContext::OnInitialize)
} else {
Weight::zero()
}
}
fn on_idle(_n: BlockNumberFor<T>, remaining_weight: Weight) -> Weight {
if let Some(weight_limit) = T::IdleMaxServiceWeight::get() {
Self::service_queues_impl(
weight_limit.min(remaining_weight),
ServiceQueuesContext::OnIdle,
)
} else {
Weight::zero()
}
}
#[cfg(feature = "try-runtime")]
fn try_state(_: BlockNumberFor<T>) -> Result<(), sp_runtime::TryRuntimeError> {
Self::do_try_state()
}
#[cfg(test)]
fn integrity_test() {
Self::do_integrity_test().expect("Pallet config is valid; qed")
}
}
#[pallet::call]
impl<T: Config> Pallet<T> {
#[pallet::call_index(0)]
#[pallet::weight(T::WeightInfo::reap_page())]
pub fn reap_page(
origin: OriginFor<T>,
message_origin: MessageOriginOf<T>,
page_index: PageIndex,
) -> DispatchResult {
ensure_signed(origin)?;
Self::do_reap_page(&message_origin, page_index)
}
#[pallet::call_index(1)]
#[pallet::weight(
T::WeightInfo::execute_overweight_page_updated().max(
T::WeightInfo::execute_overweight_page_removed()).saturating_add(*weight_limit)
)]
pub fn execute_overweight(
origin: OriginFor<T>,
message_origin: MessageOriginOf<T>,
page: PageIndex,
index: T::Size,
weight_limit: Weight,
) -> DispatchResultWithPostInfo {
ensure_signed(origin)?;
let actual_weight =
Self::do_execute_overweight(message_origin, page, index, weight_limit)?;
Ok(Some(actual_weight).into())
}
}
}
#[derive(PartialEq, Debug)]
enum PageExecutionStatus {
Bailed,
NoProgress,
NoMore,
}
#[derive(PartialEq, Debug)]
enum ItemExecutionStatus {
Bailed,
NoProgress,
NoItem,
Executed(bool),
}
#[derive(PartialEq)]
enum MessageExecutionStatus {
InsufficientWeight,
Overweight,
Processed,
Unprocessable { permanent: bool },
StackLimitReached,
}
#[derive(PartialEq)]
enum ServiceQueuesContext {
OnIdle,
OnInitialize,
ServiceQueues,
}
impl<T: Config> Pallet<T> {
fn ready_ring_knit(origin: &MessageOriginOf<T>) -> Result<Neighbours<MessageOriginOf<T>>, ()> {
if let Some(head) = ServiceHead::<T>::get() {
let mut head_book_state = BookStateFor::<T>::get(&head);
let mut head_neighbours = head_book_state.ready_neighbours.take().ok_or(())?;
let tail = head_neighbours.prev;
head_neighbours.prev = origin.clone();
head_book_state.ready_neighbours = Some(head_neighbours);
BookStateFor::<T>::insert(&head, head_book_state);
let mut tail_book_state = BookStateFor::<T>::get(&tail);
let mut tail_neighbours = tail_book_state.ready_neighbours.take().ok_or(())?;
tail_neighbours.next = origin.clone();
tail_book_state.ready_neighbours = Some(tail_neighbours);
BookStateFor::<T>::insert(&tail, tail_book_state);
Ok(Neighbours { next: head, prev: tail })
} else {
ServiceHead::<T>::put(origin);
Ok(Neighbours { next: origin.clone(), prev: origin.clone() })
}
}
fn ready_ring_unknit(origin: &MessageOriginOf<T>, neighbours: Neighbours<MessageOriginOf<T>>) {
if origin == &neighbours.next {
debug_assert!(
origin == &neighbours.prev,
"unknitting from single item ring; outgoing must be only item"
);
ServiceHead::<T>::kill();
} else {
BookStateFor::<T>::mutate(&neighbours.next, |book_state| {
if let Some(ref mut n) = book_state.ready_neighbours {
n.prev = neighbours.prev.clone()
}
});
BookStateFor::<T>::mutate(&neighbours.prev, |book_state| {
if let Some(ref mut n) = book_state.ready_neighbours {
n.next = neighbours.next.clone()
}
});
if let Some(head) = ServiceHead::<T>::get() {
if &head == origin {
ServiceHead::<T>::put(neighbours.next);
}
} else {
defensive!("`ServiceHead` must be some if there was a ready queue");
}
}
}
fn bump_service_head(weight: &mut WeightMeter) -> Option<MessageOriginOf<T>> {
if weight.try_consume(T::WeightInfo::bump_service_head()).is_err() {
return None;
}
if let Some(head) = ServiceHead::<T>::get() {
let mut head_book_state = BookStateFor::<T>::get(&head);
if let Some(head_neighbours) = head_book_state.ready_neighbours.take() {
ServiceHead::<T>::put(&head_neighbours.next);
Some(head)
} else {
defensive!("The head must point to a queue in the ready ring");
None
}
} else {
None
}
}
fn set_service_head(weight: &mut WeightMeter, queue: &MessageOriginOf<T>) -> Result<bool, ()> {
if weight.try_consume(T::WeightInfo::set_service_head()).is_err() {
return Err(());
}
if BookStateFor::<T>::get(queue).ready_neighbours.is_some() {
ServiceHead::<T>::put(queue);
Ok(true)
} else {
Ok(false)
}
}
fn max_message_weight(limit: Weight) -> Option<Weight> {
let service_weight = T::ServiceWeight::get().unwrap_or_default();
let on_idle_weight = T::IdleMaxServiceWeight::get().unwrap_or_default();
let max_message_weight =
if service_weight.any_gt(on_idle_weight) { service_weight } else { on_idle_weight };
if max_message_weight.is_zero() {
limit.checked_sub(&Self::single_msg_overhead())
} else {
max_message_weight.checked_sub(&Self::single_msg_overhead())
}
}
fn single_msg_overhead() -> Weight {
T::WeightInfo::bump_service_head()
.saturating_add(T::WeightInfo::service_queue_base())
.saturating_add(
T::WeightInfo::service_page_base_completion()
.max(T::WeightInfo::service_page_base_no_completion()),
)
.saturating_add(T::WeightInfo::service_page_item())
.saturating_add(T::WeightInfo::ready_ring_unknit())
}
#[cfg(test)]
fn do_integrity_test() -> Result<(), String> {
ensure!(!MaxMessageLenOf::<T>::get().is_zero(), "HeapSize too low");
let max_block = T::BlockWeights::get().max_block;
if let Some(service) = T::ServiceWeight::get() {
if Self::max_message_weight(service).is_none() {
return Err(format!(
"ServiceWeight too low: {}. Must be at least {}",
service,
Self::single_msg_overhead(),
));
}
if service.any_gt(max_block) {
return Err(format!(
"ServiceWeight {service} is bigger than max block weight {max_block}"
));
}
}
if let Some(on_idle) = T::IdleMaxServiceWeight::get() {
if on_idle.any_gt(max_block) {
return Err(format!(
"IdleMaxServiceWeight {on_idle} is bigger than max block weight {max_block}"
));
}
}
if let (Some(service_weight), Some(on_idle)) =
(T::ServiceWeight::get(), T::IdleMaxServiceWeight::get())
{
if !(service_weight.all_gt(on_idle) ||
on_idle.all_gt(service_weight) ||
service_weight == on_idle)
{
return Err("One of `ServiceWeight` or `IdleMaxServiceWeight` needs to be `all_gt` or both need to be equal.".into());
}
}
Ok(())
}
fn do_enqueue_messages<'a>(
origin: &MessageOriginOf<T>,
messages: impl Iterator<Item = BoundedSlice<'a, u8, MaxMessageLenOf<T>>>,
) {
let mut book_state = BookStateFor::<T>::get(origin);
let mut maybe_page = None;
if book_state.end > book_state.begin {
debug_assert!(book_state.ready_neighbours.is_some(), "Must be in ready ring if ready");
maybe_page = Pages::<T>::get(origin, book_state.end - 1).or_else(|| {
defensive!("Corruption: referenced page doesn't exist.");
None
});
}
for message in messages {
if let Some(mut page) = maybe_page {
maybe_page = match page.try_append_message::<T>(message) {
Ok(_) => Some(page),
Err(_) => {
Pages::<T>::insert(origin, book_state.end - 1, page);
None
},
}
}
if maybe_page.is_none() {
book_state.end.saturating_inc();
book_state.count.saturating_inc();
maybe_page = Some(Page::from_message::<T>(message));
}
book_state.message_count.saturating_inc();
book_state
.size
.saturating_accrue(message.len() as u64);
}
if let Some(page) = maybe_page {
Pages::<T>::insert(origin, book_state.end - 1, page);
}
if book_state.ready_neighbours.is_none() {
match Self::ready_ring_knit(origin) {
Ok(neighbours) => book_state.ready_neighbours = Some(neighbours),
Err(()) => {
defensive!("Ring state invalid when knitting");
},
}
}
BookStateFor::<T>::insert(origin, book_state);
}
pub fn do_execute_overweight(
origin: MessageOriginOf<T>,
page_index: PageIndex,
index: T::Size,
weight_limit: Weight,
) -> Result<Weight, Error<T>> {
match with_service_mutex(|| {
Self::do_execute_overweight_inner(origin, page_index, index, weight_limit)
}) {
Err(()) => Err(Error::<T>::RecursiveDisallowed),
Ok(x) => x,
}
}
fn do_execute_overweight_inner(
origin: MessageOriginOf<T>,
page_index: PageIndex,
index: T::Size,
weight_limit: Weight,
) -> Result<Weight, Error<T>> {
let mut book_state = BookStateFor::<T>::get(&origin);
ensure!(!T::QueuePausedQuery::is_paused(&origin), Error::<T>::QueuePaused);
let mut page = Pages::<T>::get(&origin, page_index).ok_or(Error::<T>::NoPage)?;
let (pos, is_processed, payload) =
page.peek_index(index.into() as usize).ok_or(Error::<T>::NoMessage)?;
let payload_len = payload.len() as u64;
ensure!(
page_index < book_state.begin ||
(page_index == book_state.begin && pos < page.first.into() as usize),
Error::<T>::Queued
);
ensure!(!is_processed, Error::<T>::AlreadyProcessed);
use MessageExecutionStatus::*;
let mut weight_counter = WeightMeter::with_limit(weight_limit);
match Self::process_message_payload(
origin.clone(),
page_index,
index,
payload,
&mut weight_counter,
Weight::MAX,
) {
Overweight | InsufficientWeight => Err(Error::<T>::InsufficientWeight),
StackLimitReached | Unprocessable { permanent: false } => {
Err(Error::<T>::TemporarilyUnprocessable)
},
Unprocessable { permanent: true } | Processed => {
page.note_processed_at_pos(pos);
book_state.message_count.saturating_dec();
book_state.size.saturating_reduce(payload_len);
let page_weight = if page.remaining.is_zero() {
debug_assert!(
page.remaining_size.is_zero(),
"no messages remaining; no space taken; qed"
);
Pages::<T>::remove(&origin, page_index);
debug_assert!(book_state.count >= 1, "page exists, so book must have pages");
book_state.count.saturating_dec();
T::WeightInfo::execute_overweight_page_removed()
} else {
Pages::<T>::insert(&origin, page_index, page);
T::WeightInfo::execute_overweight_page_updated()
};
BookStateFor::<T>::insert(&origin, &book_state);
T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
Ok(weight_counter.consumed().saturating_add(page_weight))
},
}
}
fn do_reap_page(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
match with_service_mutex(|| Self::do_reap_page_inner(origin, page_index)) {
Err(()) => Err(Error::<T>::RecursiveDisallowed.into()),
Ok(x) => x,
}
}
fn do_reap_page_inner(origin: &MessageOriginOf<T>, page_index: PageIndex) -> DispatchResult {
let mut book_state = BookStateFor::<T>::get(origin);
ensure!(page_index < book_state.begin, Error::<T>::NotReapable);
let page = Pages::<T>::get(origin, page_index).ok_or(Error::<T>::NoPage)?;
let reapable = page.remaining.is_zero();
let cullable = || {
let total_pages = book_state.count;
let ready_pages = book_state.end.saturating_sub(book_state.begin).min(total_pages);
let stale_pages = total_pages - ready_pages;
let max_stale = T::MaxStale::get();
let overflow = match stale_pages.checked_sub(max_stale + 1) {
Some(x) => x + 1,
None => return false,
};
let backlog = (max_stale * max_stale / overflow).max(max_stale);
let watermark = book_state.begin.saturating_sub(backlog);
page_index < watermark
};
ensure!(reapable || cullable(), Error::<T>::NotReapable);
Pages::<T>::remove(origin, page_index);
debug_assert!(book_state.count > 0, "reaping a page implies there are pages");
book_state.count.saturating_dec();
book_state.message_count.saturating_reduce(page.remaining.into() as u64);
book_state.size.saturating_reduce(page.remaining_size.into() as u64);
BookStateFor::<T>::insert(origin, &book_state);
T::QueueChangeHandler::on_queue_changed(origin.clone(), book_state.into());
Self::deposit_event(Event::PageReaped { origin: origin.clone(), index: page_index });
Ok(())
}
fn service_queue(
origin: MessageOriginOf<T>,
weight: &mut WeightMeter,
overweight_limit: Weight,
) -> (bool, Option<MessageOriginOf<T>>) {
use PageExecutionStatus::*;
if weight
.try_consume(
T::WeightInfo::service_queue_base()
.saturating_add(T::WeightInfo::ready_ring_unknit()),
)
.is_err()
{
return (false, None);
}
let mut book_state = BookStateFor::<T>::get(&origin);
let mut total_processed = 0;
if T::QueuePausedQuery::is_paused(&origin) {
let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
return (false, next_ready);
}
while book_state.end > book_state.begin {
let (processed, status) =
Self::service_page(&origin, &mut book_state, weight, overweight_limit);
total_processed.saturating_accrue(processed);
match status {
Bailed | NoProgress => break,
NoMore => (),
};
book_state.begin.saturating_inc();
}
let next_ready = book_state.ready_neighbours.as_ref().map(|x| x.next.clone());
if book_state.begin >= book_state.end {
if let Some(neighbours) = book_state.ready_neighbours.take() {
Self::ready_ring_unknit(&origin, neighbours);
} else if total_processed > 0 {
defensive!("Freshly processed queue must have been ready");
}
}
BookStateFor::<T>::insert(&origin, &book_state);
if total_processed > 0 {
T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
}
(total_processed > 0, next_ready)
}
fn service_page(
origin: &MessageOriginOf<T>,
book_state: &mut BookStateOf<T>,
weight: &mut WeightMeter,
overweight_limit: Weight,
) -> (u32, PageExecutionStatus) {
use PageExecutionStatus::*;
if weight
.try_consume(
T::WeightInfo::service_page_base_completion()
.max(T::WeightInfo::service_page_base_no_completion()),
)
.is_err()
{
return (0, Bailed);
}
let page_index = book_state.begin;
let mut page = match Pages::<T>::get(origin, page_index) {
Some(p) => p,
None => {
defensive!("message-queue: referenced page not found");
return (0, NoMore);
},
};
let mut total_processed = 0;
let status = loop {
use ItemExecutionStatus::*;
match Self::service_page_item(
origin,
page_index,
book_state,
&mut page,
weight,
overweight_limit,
) {
Bailed => break PageExecutionStatus::Bailed,
NoItem => break PageExecutionStatus::NoMore,
NoProgress => break PageExecutionStatus::NoProgress,
Executed(true) => total_processed.saturating_inc(),
Executed(false) => (),
}
};
if page.is_complete() {
debug_assert!(status != Bailed, "we never bail if a page became complete");
Pages::<T>::remove(origin, page_index);
debug_assert!(book_state.count > 0, "completing a page implies there are pages");
book_state.count.saturating_dec();
} else {
Pages::<T>::insert(origin, page_index, page);
}
(total_processed, status)
}
pub(crate) fn service_page_item(
origin: &MessageOriginOf<T>,
page_index: PageIndex,
book_state: &mut BookStateOf<T>,
page: &mut PageOf<T>,
weight: &mut WeightMeter,
overweight_limit: Weight,
) -> ItemExecutionStatus {
use MessageExecutionStatus::*;
if page.is_complete() {
return ItemExecutionStatus::NoItem;
}
if weight.try_consume(T::WeightInfo::service_page_item()).is_err() {
return ItemExecutionStatus::Bailed;
}
let payload = &match page.peek_first() {
Some(m) => m,
None => return ItemExecutionStatus::NoItem,
}[..];
let payload_len = payload.len() as u64;
Pages::<T>::insert(origin, page_index, &*page);
BookStateFor::<T>::insert(origin, &*book_state);
let res = Self::process_message_payload(
origin.clone(),
page_index,
page.first_index,
payload,
weight,
overweight_limit,
);
*book_state = BookStateFor::<T>::get(origin);
if let Some(new_page) = Pages::<T>::get(origin, page_index) {
*page = new_page;
} else {
defensive!("page must exist since we just inserted it and recursive calls are not allowed to remove anything");
return ItemExecutionStatus::NoItem;
};
let is_processed = match res {
InsufficientWeight => return ItemExecutionStatus::Bailed,
Unprocessable { permanent: false } => return ItemExecutionStatus::NoProgress,
Processed | Unprocessable { permanent: true } | StackLimitReached => true,
Overweight => false,
};
if is_processed {
book_state.message_count.saturating_dec();
book_state.size.saturating_reduce(payload_len as u64);
}
page.skip_first(is_processed);
ItemExecutionStatus::Executed(is_processed)
}
#[cfg(any(test, feature = "try-runtime", feature = "std"))]
pub fn do_try_state() -> Result<(), sp_runtime::TryRuntimeError> {
ensure!(
BookStateFor::<T>::iter_keys().count() == BookStateFor::<T>::iter_values().count(),
"Memory Corruption in BookStateFor"
);
ensure!(
Pages::<T>::iter_keys().count() == Pages::<T>::iter_values().count(),
"Memory Corruption in Pages"
);
for book in BookStateFor::<T>::iter_values() {
ensure!(book.end >= book.begin, "Invariant");
ensure!(book.end < 1 << 30, "Likely overflow or corruption");
ensure!(book.message_count < 1 << 30, "Likely overflow or corruption");
ensure!(book.size < 1 << 30, "Likely overflow or corruption");
ensure!(book.count < 1 << 30, "Likely overflow or corruption");
let fp: QueueFootprint = book.into();
ensure!(fp.ready_pages <= fp.pages, "There cannot be more ready than total pages");
}
let Some(starting_origin) = ServiceHead::<T>::get() else { return Ok(()) };
while let Some(head) = Self::bump_service_head(&mut WeightMeter::new()) {
ensure!(
BookStateFor::<T>::contains_key(&head),
"Service head must point to an existing book"
);
let head_book_state = BookStateFor::<T>::get(&head);
ensure!(
head_book_state.message_count > 0,
"There must be some messages if in ReadyRing"
);
ensure!(head_book_state.size > 0, "There must be some message size if in ReadyRing");
ensure!(
head_book_state.end > head_book_state.begin,
"End > Begin if unprocessed messages exists"
);
ensure!(
head_book_state.ready_neighbours.is_some(),
"There must be neighbours if in ReadyRing"
);
if head_book_state.ready_neighbours.as_ref().unwrap().next == head {
ensure!(
head_book_state.ready_neighbours.as_ref().unwrap().prev == head,
"Can only happen if only queue in ReadyRing"
);
}
for page_index in head_book_state.begin..head_book_state.end {
let page = Pages::<T>::get(&head, page_index).unwrap();
let remaining_messages = page.remaining;
let mut counted_remaining_messages: u32 = 0;
ensure!(
remaining_messages > 0.into(),
"These must be some messages that have not been processed yet!"
);
for i in 0..u32::MAX {
if let Some((_, processed, _)) = page.peek_index(i as usize) {
if !processed {
counted_remaining_messages += 1;
}
} else {
break;
}
}
ensure!(
remaining_messages.into() == counted_remaining_messages,
"Memory Corruption"
);
}
if head_book_state.ready_neighbours.as_ref().unwrap().next == starting_origin {
break;
}
}
Ok(())
}
#[cfg(feature = "std")]
pub fn debug_info() -> String {
let mut info = String::new();
for (origin, book_state) in BookStateFor::<T>::iter() {
let mut queue = format!("queue {:?}:\n", &origin);
let mut pages = Pages::<T>::iter_prefix(&origin).collect::<Vec<_>>();
pages.sort_by(|(a, _), (b, _)| a.cmp(b));
for (page_index, mut page) in pages.into_iter() {
let page_info = if book_state.begin == page_index { ">" } else { " " };
let mut page_info = format!(
"{} page {} ({:?} first, {:?} last, {:?} remain): [ ",
page_info, page_index, page.first, page.last, page.remaining
);
for i in 0..u32::MAX {
if let Some((_, processed, message)) =
page.peek_index(i.try_into().expect("std-only code"))
{
let msg = String::from_utf8_lossy(message);
if processed {
page_info.push('*');
}
page_info.push_str(&format!("{:?}, ", msg));
page.skip_first(true);
} else {
break;
}
}
page_info.push_str("]\n");
queue.push_str(&page_info);
}
info.push_str(&queue);
}
info
}
fn process_message_payload(
origin: MessageOriginOf<T>,
page_index: PageIndex,
message_index: T::Size,
message: &[u8],
meter: &mut WeightMeter,
overweight_limit: Weight,
) -> MessageExecutionStatus {
let mut id = sp_io::hashing::blake2_256(message);
use ProcessMessageError::*;
let prev_consumed = meter.consumed();
let transaction =
storage::with_transaction(|| -> TransactionOutcome<Result<_, DispatchError>> {
let res =
T::MessageProcessor::process_message(message, origin.clone(), meter, &mut id);
match &res {
Ok(_) => TransactionOutcome::Commit(Ok(res)),
Err(_) => TransactionOutcome::Rollback(Ok(res)),
}
});
let transaction = match transaction {
Ok(result) => result,
_ => {
defensive!(
"Error occurred processing message, storage changes will be rolled back"
);
return MessageExecutionStatus::Unprocessable { permanent: true };
},
};
match transaction {
Err(Overweight(w)) if w.any_gt(overweight_limit) => {
Self::deposit_event(Event::<T>::OverweightEnqueued {
id,
origin,
page_index,
message_index,
});
MessageExecutionStatus::Overweight
},
Err(Overweight(_)) => {
MessageExecutionStatus::InsufficientWeight
},
Err(Yield) => {
MessageExecutionStatus::Unprocessable { permanent: false }
},
Err(error @ BadFormat | error @ Corrupt | error @ Unsupported) => {
Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
MessageExecutionStatus::Unprocessable { permanent: true }
},
Err(error @ StackLimitReached) => {
Self::deposit_event(Event::<T>::ProcessingFailed { id: id.into(), origin, error });
MessageExecutionStatus::StackLimitReached
},
Ok(success) => {
let weight_used = meter.consumed().saturating_sub(prev_consumed);
Self::deposit_event(Event::<T>::Processed {
id: id.into(),
origin,
weight_used,
success,
});
MessageExecutionStatus::Processed
},
}
}
fn service_queues_impl(weight_limit: Weight, context: ServiceQueuesContext) -> Weight {
let mut weight = WeightMeter::with_limit(weight_limit);
let overweight_limit = Self::max_message_weight(weight_limit).unwrap_or_else(|| {
if matches!(context, ServiceQueuesContext::OnInitialize) {
defensive!("Not enough weight to service a single message.");
}
Weight::zero()
});
match with_service_mutex(|| {
let mut next = match Self::bump_service_head(&mut weight) {
Some(h) => h,
None => return weight.consumed(),
};
let mut last_no_progress = None;
loop {
let (progressed, n) =
Self::service_queue(next.clone(), &mut weight, overweight_limit);
next = match n {
Some(n) => {
if !progressed {
if last_no_progress == Some(n.clone()) {
break;
}
if last_no_progress.is_none() {
last_no_progress = Some(next.clone())
}
n
} else {
last_no_progress = None;
n
}
},
None => break,
}
}
weight.consumed()
}) {
Err(()) => weight.consumed(),
Ok(w) => w,
}
}
}
impl<T: Config> ForceSetHead<MessageOriginOf<T>> for Pallet<T> {
fn force_set_head(weight: &mut WeightMeter, origin: &MessageOriginOf<T>) -> Result<bool, ()> {
Pallet::<T>::set_service_head(weight, origin)
}
}
pub(crate) fn with_service_mutex<F: FnOnce() -> R, R>(f: F) -> Result<R, ()> {
environmental::environmental!(token: Option<()>);
token::using_once(&mut Some(()), || {
let hold = token::with(|t| t.take()).ok_or(()).defensive()?.ok_or(())?;
defer! {
token::with(|t| {
*t = Some(hold);
});
}
Ok(f())
})
}
pub struct MaxEncodedLenOf<T>(core::marker::PhantomData<T>);
impl<T: MaxEncodedLen> Get<u32> for MaxEncodedLenOf<T> {
fn get() -> u32 {
T::max_encoded_len() as u32
}
}
pub struct MaxMessageLen<Origin, Size, HeapSize>(
core::marker::PhantomData<(Origin, Size, HeapSize)>,
);
impl<Origin: MaxEncodedLen, Size: MaxEncodedLen + Into<u32>, HeapSize: Get<Size>> Get<u32>
for MaxMessageLen<Origin, Size, HeapSize>
{
fn get() -> u32 {
(HeapSize::get().into()).saturating_sub(ItemHeader::<Size>::max_encoded_len() as u32)
}
}
pub type MaxMessageLenOf<T> =
MaxMessageLen<MessageOriginOf<T>, <T as Config>::Size, <T as Config>::HeapSize>;
pub type MaxOriginLenOf<T> = MaxEncodedLenOf<MessageOriginOf<T>>;
pub type MessageOriginOf<T> = <<T as Config>::MessageProcessor as ProcessMessage>::Origin;
pub type HeapSizeU32Of<T> = IntoU32<<T as Config>::HeapSize, <T as Config>::Size>;
pub type PageOf<T> = Page<<T as Config>::Size, <T as Config>::HeapSize>;
pub type BookStateOf<T> = BookState<MessageOriginOf<T>>;
pub struct IntoU32<T, O>(core::marker::PhantomData<(T, O)>);
impl<T: Get<O>, O: Into<u32>> Get<u32> for IntoU32<T, O> {
fn get() -> u32 {
T::get().into()
}
}
impl<T: Config> ServiceQueues for Pallet<T> {
type OverweightMessageAddress = (MessageOriginOf<T>, PageIndex, T::Size);
fn service_queues(weight_limit: Weight) -> Weight {
Self::service_queues_impl(weight_limit, ServiceQueuesContext::ServiceQueues)
}
fn execute_overweight(
weight_limit: Weight,
(message_origin, page, index): Self::OverweightMessageAddress,
) -> Result<Weight, ExecuteOverweightError> {
let mut weight = WeightMeter::with_limit(weight_limit);
if weight
.try_consume(
T::WeightInfo::execute_overweight_page_removed()
.max(T::WeightInfo::execute_overweight_page_updated()),
)
.is_err()
{
return Err(ExecuteOverweightError::InsufficientWeight);
}
Pallet::<T>::do_execute_overweight(message_origin, page, index, weight.remaining()).map_err(
|e| match e {
Error::<T>::InsufficientWeight => ExecuteOverweightError::InsufficientWeight,
Error::<T>::AlreadyProcessed => ExecuteOverweightError::AlreadyProcessed,
Error::<T>::QueuePaused => ExecuteOverweightError::QueuePaused,
Error::<T>::NoPage | Error::<T>::NoMessage | Error::<T>::Queued => {
ExecuteOverweightError::NotFound
},
Error::<T>::RecursiveDisallowed => ExecuteOverweightError::RecursiveDisallowed,
_ => ExecuteOverweightError::Other,
},
)
}
}
impl<T: Config> EnqueueMessage<MessageOriginOf<T>> for Pallet<T> {
type MaxMessageLen =
MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
fn enqueue_message(
message: BoundedSlice<u8, Self::MaxMessageLen>,
origin: <T::MessageProcessor as ProcessMessage>::Origin,
) {
Self::do_enqueue_messages(&origin, [message].into_iter());
let book_state = BookStateFor::<T>::get(&origin);
T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
}
fn enqueue_messages<'a>(
messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
origin: <T::MessageProcessor as ProcessMessage>::Origin,
) {
Self::do_enqueue_messages(&origin, messages);
let book_state = BookStateFor::<T>::get(&origin);
T::QueueChangeHandler::on_queue_changed(origin, book_state.into());
}
fn sweep_queue(origin: MessageOriginOf<T>) {
if !BookStateFor::<T>::contains_key(&origin) {
return;
}
let mut book_state = BookStateFor::<T>::get(&origin);
book_state.begin = book_state.end;
if let Some(neighbours) = book_state.ready_neighbours.take() {
Self::ready_ring_unknit(&origin, neighbours);
}
BookStateFor::<T>::insert(&origin, &book_state);
}
}
impl<T: Config> QueueFootprintQuery<MessageOriginOf<T>> for Pallet<T> {
type MaxMessageLen =
MaxMessageLen<<T::MessageProcessor as ProcessMessage>::Origin, T::Size, T::HeapSize>;
fn get_batches_footprints<'a>(
origin: MessageOriginOf<T>,
msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
total_pages_limit: u32,
) -> BatchesFootprints {
let mut batches_footprints = BatchesFootprints::default();
let mut new_page = false;
let mut total_pages_count = 0;
let mut current_page_pos: usize = T::HeapSize::get().into() as usize;
let book = BookStateFor::<T>::get(&origin);
if book.end > book.begin {
total_pages_count = book.end - book.begin;
if let Some(page) = Pages::<T>::get(origin, book.end - 1) {
current_page_pos = page.heap_pos();
batches_footprints.first_page_pos = current_page_pos;
}
}
let mut msgs = msgs.peekable();
while let Some(msg) = msgs.peek() {
if total_pages_count > total_pages_limit {
return batches_footprints;
}
match Page::<T::Size, T::HeapSize>::can_append_message_at(current_page_pos, msg.len()) {
Ok(new_pos) => {
current_page_pos = new_pos;
batches_footprints.push(msg, new_page);
new_page = false;
msgs.next();
},
Err(_) => {
new_page = true;
total_pages_count += 1;
current_page_pos = 0;
},
}
}
batches_footprints
}
fn footprint(origin: MessageOriginOf<T>) -> QueueFootprint {
BookStateFor::<T>::get(&origin).into()
}
}