use crate::{
configuration,
inclusion::{
tests::run_to_block_default_notifications as run_to_block, AggregateMessageOrigin,
AggregateMessageOrigin::Ump, UmpAcceptanceCheckErr, UmpQueueId,
},
mock::{
assert_last_event, assert_last_events, new_test_ext, MessageQueue, MessageQueueSize,
MockGenesisConfig, ParaInclusion, Processed, System, Test, *,
},
};
use pezframe_support::{
assert_noop, assert_ok,
pezpallet_prelude::*,
traits::{ExecuteOverweightError, QueueFootprintQuery, ServiceQueues},
weights::Weight,
};
use pezkuwi_primitives::{
well_known_keys, ClaimQueueOffset, CoreSelector, Id as ParaId, UMPSignal, UpwardMessage,
UMP_SEPARATOR,
};
use pezsp_crypto_hashing::{blake2_256, twox_64};
use pezsp_runtime::traits::Bounded;
pub(super) struct GenesisConfigBuilder {
max_upward_message_size: u32,
max_upward_message_num_per_candidate: u32,
max_upward_queue_count: u32,
max_upward_queue_size: u32,
}
impl Default for GenesisConfigBuilder {
fn default() -> Self {
Self {
max_upward_message_size: 16,
max_upward_message_num_per_candidate: 2,
max_upward_queue_count: 4,
max_upward_queue_size: 64,
}
}
}
impl GenesisConfigBuilder {
pub(super) fn large_queue_count() -> Self {
Self { max_upward_queue_count: 128, ..Default::default() }
}
pub(super) fn build(self) -> crate::mock::MockGenesisConfig {
let mut genesis = default_genesis_config();
let config = &mut genesis.configuration.config;
config.max_upward_message_size = self.max_upward_message_size;
config.max_upward_message_num_per_candidate = self.max_upward_message_num_per_candidate;
config.max_upward_queue_count = self.max_upward_queue_count;
config.max_upward_queue_size = self.max_upward_queue_size;
genesis
}
}
fn default_genesis_config() -> MockGenesisConfig {
MockGenesisConfig {
configuration: crate::configuration::GenesisConfig {
config: crate::configuration::HostConfiguration {
max_downward_message_size: 1024,
..Default::default()
},
},
..Default::default()
}
}
fn queue_upward_msg(para: ParaId, msg: UpwardMessage) {
try_queue_upward_msg(para, msg).unwrap();
}
fn try_queue_upward_msg(para: ParaId, msg: UpwardMessage) -> Result<(), UmpAcceptanceCheckErr> {
let msgs = vec![msg];
ParaInclusion::check_upward_messages(&configuration::ActiveConfig::<Test>::get(), para, &msgs)?;
ParaInclusion::receive_upward_messages(para, msgs.as_slice());
Ok(())
}
mod check_upward_messages {
use super::*;
const P_0: ParaId = ParaId::new(0u32);
const P_1: ParaId = ParaId::new(1u32);
fn msg(data: &str) -> UpwardMessage {
data.as_bytes().to_vec()
}
fn check(para: ParaId, msgs: Vec<UpwardMessage>, err: Option<UmpAcceptanceCheckErr>) {
assert_eq!(
ParaInclusion::check_upward_messages(
&configuration::ActiveConfig::<Test>::get(),
para,
&msgs[..]
)
.err(),
err
);
}
fn queue(para: ParaId, msgs: Vec<UpwardMessage>) {
msgs.into_iter().for_each(|msg| super::queue_upward_msg(para, msg));
}
#[test]
fn basic_works() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let _g = pezframe_support::StorageNoopGuard::default();
check(P_0, vec![msg("p0m0")], None);
check(P_1, vec![msg("p1m0")], None);
check(P_0, vec![msg("p0m1")], None);
check(P_1, vec![msg("p1m1")], None);
});
}
#[test]
fn num_per_candidate_exceeded_error() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let _g = pezframe_support::StorageNoopGuard::default();
let permitted =
configuration::ActiveConfig::<Test>::get().max_upward_message_num_per_candidate;
for sent in 0..permitted + 1 {
check(P_0, vec![msg("a"); sent as usize], None);
}
for sent in permitted + 1..permitted + 10 {
check(
P_0,
vec![msg("a"); sent as usize],
Some(UmpAcceptanceCheckErr::MoreMessagesThanPermitted { sent, permitted }),
);
}
});
}
#[test]
fn size_per_message_exceeded_error() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let _g = pezframe_support::StorageNoopGuard::default();
let max_size = configuration::ActiveConfig::<Test>::get().max_upward_message_size;
let max_per_candidate =
configuration::ActiveConfig::<Test>::get().max_upward_message_num_per_candidate;
for msg_size in 1..=max_size {
check(P_0, vec![vec![0; msg_size as usize]], None);
}
for msg_size in max_size + 1..max_size + 10 {
for goods in 0..max_per_candidate {
let mut msgs = vec![vec![0; max_size as usize]; goods as usize];
msgs.push(vec![0; msg_size as usize]);
check(
P_0,
msgs,
Some(UmpAcceptanceCheckErr::MessageSize { idx: goods, msg_size, max_size }),
);
}
}
});
}
#[test]
fn queue_count_exceeded_error() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let limit = configuration::ActiveConfig::<Test>::get().max_upward_queue_count as u64;
for _ in 0..limit {
check(P_0, vec![msg("a")], None);
queue(P_0, vec![msg("a")]);
}
check(
P_0,
vec![msg("a")],
Some(UmpAcceptanceCheckErr::CapacityExceeded { count: limit + 1, limit }),
);
check(
P_0,
vec![msg("a"); 2],
Some(UmpAcceptanceCheckErr::CapacityExceeded { count: limit + 2, limit }),
);
});
}
#[test]
fn queue_size_exceeded_error() {
new_test_ext(GenesisConfigBuilder::large_queue_count().build()).execute_with(|| {
let limit = configuration::ActiveConfig::<Test>::get().max_upward_queue_size as u64;
assert_eq!(
pezpallet_message_queue::ItemHeader::<MessageQueueSize>::max_encoded_len(),
5
);
assert!(
configuration::ActiveConfig::<Test>::get().max_upward_queue_size
< crate::inclusion::MaxUmpMessageLenOf::<Test>::get(),
"Test will not work"
);
for _ in 0..limit {
check(P_0, vec![msg("1")], None);
queue(P_0, vec![msg("1")]);
}
check(
P_0,
vec![msg("1")],
Some(UmpAcceptanceCheckErr::TotalSizeExceeded { total_size: limit + 1, limit }),
);
check(
P_0,
vec![msg("123456")],
Some(UmpAcceptanceCheckErr::TotalSizeExceeded { total_size: limit + 6, limit }),
);
});
}
}
#[test]
fn dispatch_empty() {
new_test_ext(default_genesis_config()).execute_with(|| {
MessageQueue::service_queues(Weight::max_value());
});
}
#[test]
fn dispatch_single_message() {
let a = ParaId::from(228);
let msg = 1000u32.encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
queue_upward_msg(a, msg.clone());
MessageQueue::service_queues(Weight::max_value());
assert_eq!(Processed::take(), vec![(a, msg)]);
});
}
#[test]
fn dispatch_resume_after_exceeding_dispatch_stage_weight() {
let a = ParaId::from(128);
let c = ParaId::from(228);
let q = ParaId::from(911);
let a_msg_1 = (200u32, "a_msg_1").encode();
let a_msg_2 = (100u32, "a_msg_2").encode();
let c_msg_1 = (300u32, "c_msg_1").encode();
let c_msg_2 = (100u32, "c_msg_2").encode();
let q_msg = (500u32, "q_msg").encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
queue_upward_msg(q, q_msg.clone());
queue_upward_msg(c, c_msg_1.clone());
queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone());
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![(q, q_msg)]);
queue_upward_msg(c, c_msg_2.clone());
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![(c, c_msg_1), (c, c_msg_2)]);
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![(a, a_msg_1), (a, a_msg_2)]);
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![]);
});
}
#[test]
fn dispatch_keeps_message_after_weight_exhausted() {
let a = ParaId::from(128);
let a_msg_1 = (300u32, "a_msg_1").encode();
let a_msg_2 = (300u32, "a_msg_2").encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone());
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![(a, a_msg_1)]);
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![(a, a_msg_2)]);
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_eq!(Processed::take(), vec![]);
});
}
#[test]
fn dispatch_correctly_handle_remove_of_latest() {
let a = ParaId::from(1991);
let b = ParaId::from(1999);
let a_msg_1 = (300u32, "a_msg_1").encode();
let a_msg_2 = (300u32, "a_msg_2").encode();
let b_msg_1 = (300u32, "b_msg_1").encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
queue_upward_msg(a, a_msg_1.clone());
queue_upward_msg(a, a_msg_2.clone());
queue_upward_msg(b, b_msg_1.clone());
MessageQueue::service_queues(Weight::from_parts(900, 900));
assert_eq!(Processed::take(), vec![(a, a_msg_1), (a, a_msg_2), (b, b_msg_1)]);
});
}
#[test]
#[cfg_attr(debug_assertions, should_panic = "Defensive failure has been triggered")]
fn queue_enact_too_long_ignored() {
const P_0: ParaId = ParaId::new(0u32);
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let max_enact = crate::inclusion::MaxUmpMessageLenOf::<Test>::get() as usize;
let m1 = (300u32, "a_msg_1").encode();
let m2 = vec![0u8; max_enact + 1];
let m3 = (300u32, "a_msg_3").encode();
ParaInclusion::receive_upward_messages(P_0, &[m1.clone(), m2.clone(), m3.clone()]);
MessageQueue::service_queues(Weight::from_parts(900, 900));
assert_eq!(Processed::take(), vec![(P_0, m1), (P_0, m3)]);
});
}
#[test]
fn relay_dispatch_queue_size_is_updated() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let cfg = configuration::ActiveConfig::<Test>::get();
for p in 0..100 {
let para = p.into();
let m1 = (300u32 * (100 - p), "m1").encode();
let m2 = (300u32 * (100 - p), "m11").encode();
queue_upward_msg(para, m1);
queue_upward_msg(para, m2);
assert_queue_size(para, 2, 15);
assert_queue_remaining(
para,
cfg.max_upward_queue_count - 2,
cfg.max_upward_queue_size - 15,
);
MessageQueue::service_queues(Weight::from_all(300u64 * (100 - p) as u64));
assert_queue_remaining(
para,
cfg.max_upward_queue_count - 1,
cfg.max_upward_queue_size - 8,
);
}
for p in 0..98 {
let para = UmpQueueId::Para(p.into());
MessageQueue::service_queues(Weight::from_all(u64::MAX));
let fp = MessageQueue::footprint(AggregateMessageOrigin::Ump(para));
let (para_queue_count, para_queue_size) = (fp.storage.count, fp.storage.size);
assert_eq!(para_queue_count, 1, "count wrong for para: {}", p);
assert_eq!(para_queue_size, 8, "size wrong for para: {}", p);
}
for p in 0..100 {
let para = UmpQueueId::Para(p.into());
let _ = <MessageQueue as ServiceQueues>::execute_overweight(
Weight::from_all(u64::MAX),
(AggregateMessageOrigin::Ump(para.clone()), 0, 1),
);
assert_queue_remaining(p.into(), cfg.max_upward_queue_count, cfg.max_upward_queue_size);
let fp = MessageQueue::footprint(AggregateMessageOrigin::Ump(para));
let (para_queue_count, para_queue_size) = (fp.storage.count, fp.storage.size);
assert_eq!(para_queue_count, 0, "count wrong for para: {}", p);
assert_eq!(para_queue_size, 0, "size wrong for para: {}", p);
}
});
}
#[test]
fn relay_dispatch_queue_size_key_is_correct() {
#![allow(deprecated)]
#[pezframe_support::storage_alias]
type RelayDispatchQueueSize = StorageMap<Ump, Twox64Concat, ParaId, (u32, u32), ValueQuery>;
for i in 0..1024 {
let para: ParaId = u32::from_ne_bytes(twox_64(&i.encode())[..4].try_into().unwrap()).into();
let well_known = pezkuwi_primitives::well_known_keys::relay_dispatch_queue_size(para);
let aliased = RelayDispatchQueueSize::hashed_key_for(para);
assert_eq!(well_known, aliased, "Old and new key must match");
}
}
#[test]
fn verify_relay_dispatch_queue_size_is_externally_accessible() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
let cfg = configuration::ActiveConfig::<Test>::get();
for para in 0..10 {
let para = para.into();
queue_upward_msg(para, vec![0u8; 3]);
assert_queue_size(para, 1, 3);
assert_queue_remaining(
para,
cfg.max_upward_queue_count - 1,
cfg.max_upward_queue_size - 3,
);
queue_upward_msg(para, vec![0u8; 3]);
assert_queue_size(para, 2, 6);
assert_queue_remaining(
para,
cfg.max_upward_queue_count - 2,
cfg.max_upward_queue_size - 6,
);
}
});
}
fn assert_queue_size(para: ParaId, count: u32, size: u32) {
#[allow(deprecated)]
let raw_queue_size = pezsp_io::storage::get(&well_known_keys::relay_dispatch_queue_size(para))
.expect(
"enqueuing a message should create the dispatch queue\
and it should be accessible via the well known keys",
);
let (c, s) = <(u32, u32)>::decode(&mut &raw_queue_size[..])
.expect("the dispatch queue size should be decodable into (u32, u32)");
assert_eq!((c, s), (count, size));
#[allow(deprecated)]
let (c, s) = well_known_keys::relay_dispatch_queue_size_typed(para).get().expect(
"enqueuing a message should create the dispatch queue\
and it should be accessible via the well known keys",
);
assert_eq!((c, s), (count, size));
}
fn assert_queue_remaining(para: ParaId, count: u32, size: u32) {
let (remaining_cnt, remaining_size) =
well_known_keys::relay_dispatch_queue_remaining_capacity(para)
.get()
.expect("No storage value");
assert_eq!(remaining_cnt, count, "Wrong number of remaining messages in Q{}", para);
assert_eq!(remaining_size, size, "Wrong remaining size in Q{}", para);
}
#[test]
fn service_overweight_unknown() {
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
assert_noop!(
<MessageQueue as ServiceQueues>::execute_overweight(
Weight::MAX,
(Ump(UmpQueueId::Para(0u32.into())), 0, 0)
),
ExecuteOverweightError::NotFound,
);
});
}
#[test]
fn overweight_queue_works() {
let para_a = ParaId::from(2021);
let a_msg_1 = (301u32, "a_msg_1").encode();
let a_msg_2 = (501u32, "a_msg_2").encode();
let a_msg_3 = (501u32, "a_msg_3").encode();
let hash_1 = blake2_256(&a_msg_1[..]);
let hash_2 = blake2_256(&a_msg_2[..]);
let hash_3 = blake2_256(&a_msg_3[..]);
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
System::set_block_number(1);
queue_upward_msg(para_a, a_msg_1.clone());
queue_upward_msg(para_a, a_msg_2.clone());
queue_upward_msg(para_a, a_msg_3.clone());
MessageQueue::service_queues(Weight::from_parts(500, 500));
assert_last_events(
[
pezpallet_message_queue::Event::<Test>::Processed {
id: hash_1.into(),
origin: Ump(UmpQueueId::Para(para_a)),
weight_used: Weight::from_parts(301, 301),
success: true,
}
.into(),
pezpallet_message_queue::Event::<Test>::OverweightEnqueued {
id: hash_2.into(),
origin: Ump(UmpQueueId::Para(para_a)),
page_index: 0,
message_index: 1,
}
.into(),
pezpallet_message_queue::Event::<Test>::OverweightEnqueued {
id: hash_3.into(),
origin: Ump(UmpQueueId::Para(para_a)),
page_index: 0,
message_index: 2,
}
.into(),
]
.into_iter(),
);
assert_eq!(Processed::take(), vec![(para_a, a_msg_1)]);
assert_noop!(
<MessageQueue as ServiceQueues>::execute_overweight(
Weight::from_parts(500, 500),
(Ump(UmpQueueId::Para(para_a)), 0, 2)
),
ExecuteOverweightError::InsufficientWeight,
);
assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
Weight::from_parts(501, 501),
(Ump(UmpQueueId::Para(para_a)), 0, 2)
));
assert_last_event(
pezpallet_message_queue::Event::<Test>::Processed {
id: hash_3.into(),
origin: Ump(UmpQueueId::Para(para_a)),
weight_used: Weight::from_parts(501, 501),
success: true,
}
.into(),
);
assert_noop!(
<MessageQueue as ServiceQueues>::execute_overweight(
Weight::from_parts(501, 501),
(Ump(UmpQueueId::Para(para_a)), 0, 2)
),
ExecuteOverweightError::AlreadyProcessed,
);
assert_noop!(
<MessageQueue as ServiceQueues>::execute_overweight(
Weight::from_parts(501, 501),
(Ump(UmpQueueId::Para(para_a)), 0, 3)
),
ExecuteOverweightError::NotFound,
);
});
}
#[test]
fn cannot_offboard_while_ump_dispatch_queued() {
let para = 32.into();
let msg = (300u32, "something").encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
register_teyrchain(para);
run_to_block(5, vec![4, 5]);
queue_upward_msg(para, msg.clone());
queue_upward_msg(para, msg.clone());
for i in 6..10 {
assert!(try_deregister_teyrchain(para).is_err());
run_to_block(i, vec![i]);
assert!(Paras::is_valid_para(para));
}
MessageQueue::on_initialize(System::block_number());
assert_eq!(Processed::take().len(), 1);
assert!(try_deregister_teyrchain(para).is_err());
MessageQueue::on_initialize(System::block_number());
assert_eq!(Processed::take().len(), 1);
run_to_block(10, vec![10]);
assert!(Paras::is_valid_para(para));
assert_ok!(try_deregister_teyrchain(para));
assert!(Paras::is_offboarding(para));
run_to_block(11, vec![11]);
assert!(!Paras::is_valid_para(para));
});
}
#[test]
fn enqueue_ump_signals() {
let para = 100.into();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
register_teyrchain(para);
run_to_block(5, vec![4, 5]);
let config = configuration::ActiveConfig::<Test>::get();
let mut messages = (0..config.max_upward_message_num_per_candidate)
.into_iter()
.map(|_| "msg".encode())
.collect::<Vec<_>>();
let expected_messages = messages.iter().cloned().map(|msg| (para, msg)).collect::<Vec<_>>();
messages.append(&mut vec![
UMP_SEPARATOR,
UMPSignal::SelectCore(CoreSelector(0), ClaimQueueOffset(0)).encode(),
]);
ParaInclusion::check_upward_messages(
&configuration::ActiveConfig::<Test>::get(),
para,
&messages,
)
.unwrap();
ParaInclusion::receive_upward_messages(para, &messages);
MessageQueue::service_queues(Weight::max_value());
assert_eq!(Processed::take(), expected_messages);
});
}
#[test]
fn cannot_enqueue_ump_while_offboarding() {
let para = 32.into();
let msg = (300u32, "something").encode();
new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| {
register_teyrchain(para);
run_to_block(5, vec![4, 5]);
assert_ok!(try_deregister_teyrchain(para));
assert!(Paras::is_offboarding(para));
assert!(try_queue_upward_msg(para, msg.clone()).is_err());
run_to_block(6, vec![6]);
assert!(Paras::is_offboarding(para));
assert!(try_queue_upward_msg(para, msg.clone()).is_err());
run_to_block(7, vec![7]);
assert!(!Paras::is_valid_para(para));
});
}