use crate::{
configuration::{self, HostConfiguration},
initializer, paras, FeeTracker, GetMinFeeFactor,
};
use alloc::vec::Vec;
use core::fmt;
use pezframe_support::pezpallet_prelude::*;
use pezframe_system::pezpallet_prelude::BlockNumberFor;
use pezkuwi_primitives::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
use pezsp_core::MAX_POSSIBLE_ALLOCATION;
use pezsp_runtime::{
traits::{BlakeTwo256, Hash as HashT, SaturatedConversion},
FixedU128,
};
use xcm::latest::SendError;
pub use pezpallet::*;
#[cfg(test)]
mod tests;
const THRESHOLD_FACTOR: u32 = 2;
#[derive(Debug)]
pub enum QueueDownwardMessageError {
ExceedsMaxMessageSize,
Unroutable,
}
impl From<QueueDownwardMessageError> for SendError {
fn from(err: QueueDownwardMessageError) -> Self {
match err {
QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
QueueDownwardMessageError::Unroutable => SendError::Unroutable,
}
}
}
pub(crate) enum ProcessedDownwardMessagesAcceptanceErr {
AdvancementRule,
Underflow { processed_downward_messages: u32, dmq_length: u32 },
}
impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
use ProcessedDownwardMessagesAcceptanceErr::*;
match *self {
AdvancementRule => {
write!(fmt, "DMQ is not empty, but processed_downward_messages is 0",)
},
Underflow { processed_downward_messages, dmq_length } => write!(
fmt,
"processed_downward_messages = {}, but dmq_length is only {}",
processed_downward_messages, dmq_length,
),
}
}
}
#[pezframe_support::pezpallet]
pub mod pezpallet {
use super::*;
#[pezpallet::pezpallet]
#[pezpallet::without_storage_info]
pub struct Pezpallet<T>(_);
#[pezpallet::config]
pub trait Config: pezframe_system::Config + configuration::Config + paras::Config {}
#[pezpallet::storage]
pub type DownwardMessageQueues<T: Config> = StorageMap<
_,
Twox64Concat,
ParaId,
Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
ValueQuery,
>;
#[pezpallet::storage]
pub(crate) type DownwardMessageQueueHeads<T: Config> =
StorageMap<_, Twox64Concat, ParaId, Hash, ValueQuery>;
#[pezpallet::storage]
pub(crate) type DeliveryFeeFactor<T: Config> =
StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, GetMinFeeFactor<Pezpallet<T>>>;
}
impl<T: Config> Pezpallet<T> {
pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
Weight::zero()
}
pub(crate) fn initializer_finalize() {}
pub(crate) fn initializer_on_new_session(
_notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
outgoing_paras: &[ParaId],
) {
Self::perform_outgoing_para_cleanup(outgoing_paras);
}
fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
for outgoing_para in outgoing {
Self::clean_dmp_after_outgoing(outgoing_para);
}
}
fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
DownwardMessageQueues::<T>::remove(outgoing_para);
DownwardMessageQueueHeads::<T>::remove(outgoing_para);
}
pub fn can_queue_downward_message(
config: &HostConfiguration<BlockNumberFor<T>>,
para: &ParaId,
msg: &DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
let serialized_len = msg.len() as u32;
if serialized_len > config.max_downward_message_size {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize);
}
if Self::dmq_length(*para) > Self::dmq_max_length(config.max_downward_message_size) {
return Err(QueueDownwardMessageError::ExceedsMaxMessageSize);
}
if !paras::Heads::<T>::contains_key(para) {
return Err(QueueDownwardMessageError::Unroutable);
}
Ok(())
}
pub fn queue_downward_message(
config: &HostConfiguration<BlockNumberFor<T>>,
para: ParaId,
msg: DownwardMessage,
) -> Result<(), QueueDownwardMessageError> {
let serialized_len = msg.len();
Self::can_queue_downward_message(config, ¶, &msg)?;
let inbound = InboundDownwardMessage {
msg,
sent_at: pezframe_system::Pezpallet::<T>::block_number(),
};
DownwardMessageQueueHeads::<T>::mutate(para, |head| {
let new_head =
BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
*head = new_head;
});
let q_len = DownwardMessageQueues::<T>::mutate(para, |v| {
v.push(inbound);
v.len()
});
let threshold =
Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
if q_len > (threshold as usize) {
Self::increase_fee_factor(para, serialized_len as u128);
}
Ok(())
}
pub(crate) fn check_processed_downward_messages(
para: ParaId,
relay_parent_number: BlockNumberFor<T>,
processed_downward_messages: u32,
) -> Result<(), ProcessedDownwardMessagesAcceptanceErr> {
let dmq_length = Self::dmq_length(para);
if dmq_length > 0 && processed_downward_messages == 0 {
let contents = Self::dmq_contents(para);
if contents.get(0).map_or(false, |msg| msg.sent_at <= relay_parent_number) {
return Err(ProcessedDownwardMessagesAcceptanceErr::AdvancementRule);
}
}
if dmq_length < processed_downward_messages {
return Err(ProcessedDownwardMessagesAcceptanceErr::Underflow {
processed_downward_messages,
dmq_length,
});
}
Ok(())
}
pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) {
let q_len = DownwardMessageQueues::<T>::mutate(para, |q| {
let processed_downward_messages = processed_downward_messages as usize;
if processed_downward_messages > q.len() {
q.clear();
} else {
*q = q.split_off(processed_downward_messages);
}
q.len()
});
let config = configuration::ActiveConfig::<T>::get();
let threshold =
Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
if q_len <= (threshold as usize) {
Self::decrease_fee_factor(para);
}
}
#[cfg(test)]
fn dmq_mqc_head(para: ParaId) -> Hash {
DownwardMessageQueueHeads::<T>::get(¶)
}
pub(crate) fn dmq_length(para: ParaId) -> u32 {
DownwardMessageQueues::<T>::decode_len(¶)
.unwrap_or(0)
.saturated_into::<u32>()
}
fn dmq_max_length(max_downward_message_size: u32) -> u32 {
MAX_POSSIBLE_ALLOCATION.checked_div(max_downward_message_size).unwrap_or(0)
}
pub(crate) fn dmq_contents(
recipient: ParaId,
) -> Vec<InboundDownwardMessage<BlockNumberFor<T>>> {
DownwardMessageQueues::<T>::get(&recipient)
}
#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
pub fn make_teyrchain_reachable(para: impl Into<ParaId>) {
let para = para.into();
crate::paras::Heads::<T>::insert(para, para.encode());
}
}
impl<T: Config> FeeTracker for Pezpallet<T> {
type Id = ParaId;
fn get_fee_factor(id: Self::Id) -> FixedU128 {
DeliveryFeeFactor::<T>::get(id)
}
fn set_fee_factor(id: Self::Id, val: FixedU128) {
<DeliveryFeeFactor<T>>::set(id, val);
}
}
#[cfg(feature = "runtime-benchmarks")]
impl<T: Config> crate::EnsureForTeyrchain for Pezpallet<T> {
fn ensure(para: ParaId) {
Self::make_teyrchain_reachable(para);
}
}