#![cfg(test)]
use crate::{mock::*, *};
use frame_support::{
assert_noop, assert_ok, assert_storage_noop, traits::BatchFootprint, StorageNoopGuard,
};
use rand::{rngs::StdRng, Rng, SeedableRng};
use sp_crypto_hashing::blake2_256;
#[test]
fn mocked_weight_works() {
build_and_execute::<Test>(|| {
assert!(<Test as Config>::WeightInfo::service_queue_base().is_zero());
});
build_and_execute::<Test>(|| {
set_weight("service_queue_base", Weight::MAX);
assert_eq!(<Test as Config>::WeightInfo::service_queue_base(), Weight::MAX);
});
build_and_execute::<Test>(|| {
assert!(<Test as Config>::WeightInfo::service_queue_base().is_zero());
});
}
#[test]
fn enqueue_within_one_page_works() {
build_and_execute::<Test>(|| {
use MessageOrigin::*;
MessageQueue::enqueue_message(msg("a"), Here);
MessageQueue::enqueue_message(msg("b"), Here);
MessageQueue::enqueue_message(msg("c"), Here);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(b"a".to_vec(), Here), (b"b".to_vec(), Here)]);
assert_eq!(MessageQueue::footprint(Here).pages, 1);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(b"c".to_vec(), Here)]);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 0.into_weight());
assert!(MessagesProcessed::get().is_empty());
MessageQueue::enqueue_messages([msg("a"), msg("b"), msg("c")].into_iter(), There);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(
MessagesProcessed::take(),
vec![(b"a".to_vec(), There), (b"b".to_vec(), There),]
);
MessageQueue::enqueue_message(msg("d"), Everywhere(1));
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(MessageQueue::service_queues(2.into_weight()), 0.into_weight());
assert_eq!(
MessagesProcessed::take(),
vec![(b"c".to_vec(), There), (b"d".to_vec(), Everywhere(1))]
);
});
}
#[test]
fn queue_priority_retains() {
build_and_execute::<Test>(|| {
use MessageOrigin::*;
assert_ring(&[]);
MessageQueue::enqueue_message(msg("a"), Everywhere(1));
assert_ring(&[Everywhere(1)]);
MessageQueue::enqueue_message(msg("b"), Everywhere(2));
assert_ring(&[Everywhere(1), Everywhere(2)]);
MessageQueue::enqueue_message(msg("c"), Everywhere(3));
assert_ring(&[Everywhere(1), Everywhere(2), Everywhere(3)]);
MessageQueue::enqueue_message(msg("d"), Everywhere(2));
assert_ring(&[Everywhere(1), Everywhere(2), Everywhere(3)]);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(
MessagesProcessed::take(),
vec![(vmsg("a"), Everywhere(1)), (vmsg("b"), Everywhere(2)),]
);
assert_ring(&[Everywhere(2), Everywhere(3)]);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(
MessagesProcessed::get(),
vec![(vmsg("d"), Everywhere(2)), (vmsg("c"), Everywhere(3)),]
);
assert_ring(&[]);
});
}
#[test]
fn queue_priority_reset_once_serviced() {
build_and_execute::<Test>(|| {
use MessageOrigin::*;
MessageQueue::enqueue_message(msg("a"), Everywhere(1));
MessageQueue::enqueue_message(msg("b"), Everywhere(2));
MessageQueue::enqueue_message(msg("c"), Everywhere(3));
MessageQueue::do_try_state().unwrap();
println!("{}", MessageQueue::debug_info());
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
MessageQueue::enqueue_message(msg("d"), Everywhere(2));
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(
MessagesProcessed::get(),
vec![
(vmsg("a"), Everywhere(1)),
(vmsg("b"), Everywhere(2)),
(vmsg("c"), Everywhere(3)),
(vmsg("d"), Everywhere(2)),
]
);
});
}
#[test]
fn service_queues_basic_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here), (vmsg("abc"), Here)]);
assert_eq!(QueueChanges::take(), vec![(Here, 0, 0)]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There), (vmsg("xyz"), There)]);
assert_eq!(QueueChanges::take(), vec![(There, 0, 0)]);
MessageQueue::do_try_state().unwrap();
});
}
#[test]
fn service_queues_failing_messages_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
set_weight("service_page_item", 1.into_weight());
MessageQueue::enqueue_message(msg("badformat"), Here);
MessageQueue::enqueue_message(msg("corrupt"), Here);
MessageQueue::enqueue_message(msg("unsupported"), Here);
MessageQueue::enqueue_message(msg("stacklimitreached"), Here);
MessageQueue::enqueue_message(msg("yield"), Here);
assert_pages(&[0, 1, 2]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"badformat").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::BadFormat,
}
.into(),
);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"corrupt").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::Corrupt,
}
.into(),
);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"unsupported").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::Unsupported,
}
.into(),
);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(System::events().len(), 4);
assert_pages(&[2]);
});
}
#[test]
fn service_queues_suspension_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_messages(vec![msg("a"), msg("b"), msg("c")].into_iter(), Here);
MessageQueue::enqueue_messages(vec![msg("x"), msg("y"), msg("z")].into_iter(), There);
MessageQueue::enqueue_messages(
vec![msg("m"), msg("n"), msg("o")].into_iter(),
Everywhere(0),
);
assert_eq!(QueueChanges::take(), vec![(Here, 3, 3), (There, 3, 3), (Everywhere(0), 3, 3)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
assert_eq!(QueueChanges::take(), vec![(Here, 2, 2)]);
YieldingQueues::set(vec![Here, Everywhere(0)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
assert_eq!(QueueChanges::take(), vec![(There, 2, 2)]);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("y"), There), (vmsg("z"), There)]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());
assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());
YieldingQueues::set(vec![Everywhere(0)]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("b"), Here), (vmsg("c"), Here)]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());
YieldingQueues::take();
assert_eq!(MessageQueue::service_queues(Weight::MAX), 3.into_weight());
assert_eq!(
MessagesProcessed::take(),
vec![
(vmsg("m"), Everywhere(0)),
(vmsg("n"), Everywhere(0)),
(vmsg("o"), Everywhere(0))
]
);
});
}
#[test]
#[cfg(debug_assertions)]
#[should_panic(expected = "Not enough weight to service a single message.")]
fn service_queues_low_weight_defensive() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(21.into());
assert!(MessageQueue::do_integrity_test().is_err());
MessageQueue::enqueue_message(msg("weight=0"), Here);
MessageQueue::service_queues_impl(104.into_weight(), ServiceQueuesContext::OnInitialize);
});
}
#[test]
fn service_queues_regression_1873() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(20.into());
MessageQueue::enqueue_message(msg("weight=100"), Here);
assert_eq!(MessageQueue::service_queues(100.into_weight()), 100.into());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=100"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
}
.into(),
);
});
}
#[test]
fn reap_page_permanent_overweight_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
let n = (MaxStale::get() + 10) as usize;
for _ in 0..n {
MessageQueue::enqueue_message(msg("weight=200 datadatadata"), Here);
}
assert_eq!(Pages::<Test>::iter().count(), n);
assert_eq!(MessageQueue::footprint(Here).pages, n as u32);
assert_eq!(QueueChanges::take().len(), n);
MessageQueue::service_queues(1.into_weight());
let max_stale = MaxStale::get();
for i in 0..n as u32 {
let b = BookStateFor::<Test>::get(Here);
let stale_pages = n as u32 - i;
let overflow = stale_pages.saturating_sub(max_stale + 1) + 1;
let backlog = (max_stale * max_stale / overflow).max(max_stale);
let watermark = b.begin.saturating_sub(backlog);
if i >= watermark {
break;
}
assert_ok!(MessageQueue::do_reap_page(&Here, i));
assert_eq!(QueueChanges::take(), vec![(Here, b.message_count - 1, b.size - 23)]);
}
for (o, i, _) in Pages::<Test>::iter() {
assert_noop!(MessageQueue::do_reap_page(&o, i), Error::<Test>::NotReapable);
assert!(QueueChanges::take().is_empty());
}
assert_eq!(MessageQueue::footprint(Here).pages, 3);
});
}
#[test]
fn reaping_overweight_fails_properly() {
use MessageOrigin::*;
assert_eq!(MaxStale::get(), 2, "The stale limit is two");
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("weight=200 datadata"), Here);
MessageQueue::enqueue_message(msg("a"), Here);
MessageQueue::enqueue_message(msg("weight=200 datadata"), Here);
MessageQueue::enqueue_message(msg("b"), Here);
MessageQueue::enqueue_message(msg("weight=200 datadata"), Here);
MessageQueue::enqueue_message(msg("c"), Here);
MessageQueue::enqueue_message(msg("bigbig 1 datadata"), Here);
MessageQueue::enqueue_message(msg("bigbig 2 datadata"), Here);
MessageQueue::enqueue_message(msg("bigbig 3 datadata"), Here);
assert_pages(&[0, 1, 2, 3, 4, 5]);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here), (vmsg("b"), Here)]);
for (o, i, _) in Pages::<Test>::iter() {
assert_noop!(MessageQueue::do_reap_page(&o, i), Error::<Test>::NotReapable);
}
assert_pages(&[0, 1, 2, 3, 4, 5]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("c"), Here)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 1 datadata"), Here)]);
for (o, i, _) in Pages::<Test>::iter() {
assert_noop!(MessageQueue::do_reap_page(&o, i), Error::<Test>::NotReapable);
}
assert_pages(&[0, 1, 2, 4, 5]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 2 datadata"), Here)]);
assert_pages(&[0, 1, 2, 5]);
assert_ok!(MessageQueue::do_reap_page(&Here, 0));
for (o, i, _) in Pages::<Test>::iter() {
assert_noop!(MessageQueue::do_reap_page(&o, i), Error::<Test>::NotReapable);
}
assert_pages(&[1, 2, 5]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("bigbig 3 datadata"), Here)]);
assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::<Test>::NoPage);
assert_noop!(MessageQueue::do_reap_page(&Here, 3), Error::<Test>::NoPage);
assert_noop!(MessageQueue::do_reap_page(&Here, 4), Error::<Test>::NoPage);
for (o, i, _) in Pages::<Test>::iter() {
assert_noop!(MessageQueue::do_reap_page(&o, i), Error::<Test>::NotReapable);
}
});
}
#[test]
fn service_queue_bails() {
build_and_execute::<Test>(|| {
set_weight("service_queue_base", 2.into_weight());
let mut meter = WeightMeter::with_limit(1.into_weight());
assert_storage_noop!(MessageQueue::service_queue(0u32.into(), &mut meter, Weight::MAX));
assert!(meter.consumed().is_zero());
});
build_and_execute::<Test>(|| {
set_weight("ready_ring_unknit", 2.into_weight());
let mut meter = WeightMeter::with_limit(1.into_weight());
assert_storage_noop!(MessageQueue::service_queue(0u32.into(), &mut meter, Weight::MAX));
assert!(meter.consumed().is_zero());
});
build_and_execute::<Test>(|| {
set_weight("service_queue_base", 2.into_weight());
set_weight("ready_ring_unknit", 2.into_weight());
let mut meter = WeightMeter::with_limit(3.into_weight());
assert_storage_noop!(MessageQueue::service_queue(0.into(), &mut meter, Weight::MAX));
assert!(meter.consumed().is_zero());
});
}
#[test]
fn service_page_works() {
use super::integration_test::Test; use MessageOrigin::*;
use PageExecutionStatus::*;
build_and_execute::<Test>(|| {
set_weight("service_page_base_completion", 2.into_weight());
set_weight("service_page_item", 3.into_weight());
let (page, mut msgs) = full_page::<Test>();
assert!(msgs >= 10, "pre-condition: need at least 10 msgs per page");
let mut book = book_for::<Test>(&page);
Pages::<Test>::insert(Here, 0, page);
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
while msgs > 0 {
let process = rng.gen_range(0..=msgs);
msgs -= process;
let mut meter = WeightMeter::with_limit(((2 + (3 + 1) * process) as u64).into_weight());
System::reset_events();
let (processed, status) =
crate::Pallet::<Test>::service_page(&Here, &mut book, &mut meter, Weight::MAX);
assert_eq!(processed as usize, process);
assert_eq!(NumMessagesProcessed::take(), process);
assert_eq!(System::events().len(), process);
if msgs == 0 {
assert_eq!(status, NoMore);
} else {
assert_eq!(status, Bailed);
}
}
assert_pages(&[]);
});
}
#[test]
fn service_page_bails() {
build_and_execute::<Test>(|| {
set_weight("service_page_base_completion", 2.into_weight());
let mut meter = WeightMeter::with_limit(1.into_weight());
let (page, _) = full_page::<Test>();
let mut book = book_for::<Test>(&page);
Pages::<Test>::insert(MessageOrigin::Here, 0, page);
assert_storage_noop!(MessageQueue::service_page(
&MessageOrigin::Here,
&mut book,
&mut meter,
Weight::MAX
));
assert!(meter.consumed().is_zero());
});
build_and_execute::<Test>(|| {
set_weight("service_page_base_no_completion", 2.into_weight());
let mut meter = WeightMeter::with_limit(1.into_weight());
let (page, _) = full_page::<Test>();
let mut book = book_for::<Test>(&page);
Pages::<Test>::insert(MessageOrigin::Here, 0, page);
assert_storage_noop!(MessageQueue::service_page(
&MessageOrigin::Here,
&mut book,
&mut meter,
Weight::MAX
));
assert!(meter.consumed().is_zero());
});
}
#[test]
fn service_page_item_bails() {
build_and_execute::<Test>(|| {
let _guard = StorageNoopGuard::default();
let (mut page, _) = full_page::<Test>();
let mut weight = WeightMeter::with_limit(10.into_weight());
let overweight_limit = 10.into_weight();
set_weight("service_page_item", 11.into_weight());
assert_eq!(
MessageQueue::service_page_item(
&MessageOrigin::Here,
0,
&mut book_for::<Test>(&page),
&mut page,
&mut weight,
overweight_limit,
),
ItemExecutionStatus::Bailed
);
});
}
#[test]
fn service_page_suspension_works() {
use super::integration_test::Test; use MessageOrigin::*;
use PageExecutionStatus::*;
build_and_execute::<Test>(|| {
let (page, mut msgs) = full_page::<Test>();
assert!(msgs >= 10, "pre-condition: need at least 10 msgs per page");
let mut book = book_for::<Test>(&page);
Pages::<Test>::insert(Here, 0, page);
let mut meter = WeightMeter::with_limit(5.into_weight());
let (_, status) =
crate::Pallet::<Test>::service_page(&Here, &mut book, &mut meter, Weight::MAX);
assert_eq!(NumMessagesProcessed::take(), 5);
assert!(meter.remaining().is_zero());
assert_eq!(status, Bailed); msgs -= 5;
YieldingQueues::set(vec![Here]);
for _ in 0..5 {
let (_, status) = crate::Pallet::<Test>::service_page(
&Here,
&mut book,
&mut WeightMeter::new(),
Weight::MAX,
);
assert_eq!(status, NoProgress);
assert!(NumMessagesProcessed::take().is_zero());
}
YieldingQueues::take();
let (_, status) = crate::Pallet::<Test>::service_page(
&Here,
&mut book,
&mut WeightMeter::new(),
Weight::MAX,
);
assert_eq!(status, NoMore);
assert_eq!(NumMessagesProcessed::take(), msgs);
assert!(Pages::<Test>::iter_keys().count().is_zero());
});
}
#[test]
fn bump_service_head_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
build_triple_ring();
for i in 0..99 {
let current = MessageQueue::bump_service_head(&mut WeightMeter::new()).unwrap();
assert_eq!(current, [Here, There, Everywhere(0)][i % 3]);
}
assert_ring(&[Here, There, Everywhere(0)]);
});
}
#[test]
fn bump_service_head_bails() {
build_and_execute::<Test>(|| {
set_weight("bump_service_head", 2.into_weight());
setup_bump_service_head::<Test>(0.into(), 1.into());
let _guard = StorageNoopGuard::default();
let mut meter = WeightMeter::with_limit(1.into_weight());
assert!(MessageQueue::bump_service_head(&mut meter).is_none());
assert_eq!(meter.consumed(), 0.into_weight());
});
}
#[test]
fn bump_service_head_trivial_works() {
build_and_execute::<Test>(|| {
set_weight("bump_service_head", 2.into_weight());
let mut meter = WeightMeter::new();
assert_eq!(MessageQueue::bump_service_head(&mut meter), None, "Cannot bump");
assert_eq!(meter.consumed(), 2.into_weight());
setup_bump_service_head::<Test>(0.into(), 1.into());
assert_eq!(MessageQueue::bump_service_head(&mut meter), Some(0.into()));
assert_eq!(ServiceHead::<Test>::get().unwrap(), 1.into(), "Bumped the head");
assert_eq!(meter.consumed(), 4.into_weight());
assert_eq!(MessageQueue::bump_service_head(&mut meter), Some(1.into()), "Its a ring");
assert_eq!(meter.consumed(), 6.into_weight());
});
}
#[test]
fn bump_service_head_no_head_noops() {
build_and_execute::<Test>(|| {
build_triple_ring();
ServiceHead::<Test>::kill();
assert_storage_noop!(MessageQueue::bump_service_head(&mut WeightMeter::new()));
});
}
#[test]
fn service_page_item_consumes_correct_weight() {
build_and_execute::<Test>(|| {
let mut page = page::<Test>(b"weight=3");
let mut weight = WeightMeter::with_limit(10.into_weight());
let overweight_limit = 0.into_weight();
set_weight("service_page_item", 2.into_weight());
assert_eq!(
MessageQueue::service_page_item(
&MessageOrigin::Here,
0,
&mut book_for::<Test>(&page),
&mut page,
&mut weight,
overweight_limit
),
ItemExecutionStatus::Executed(true)
);
assert_eq!(weight.consumed(), 5.into_weight());
});
}
#[test]
fn service_page_item_skips_perm_overweight_message() {
build_and_execute::<Test>(|| {
let mut page = page::<Test>(b"TooMuch");
let mut weight = WeightMeter::with_limit(2.into_weight());
let overweight_limit = 0.into_weight();
set_weight("service_page_item", 2.into_weight());
assert_eq!(
crate::Pallet::<Test>::service_page_item(
&MessageOrigin::Here,
0,
&mut book_for::<Test>(&page),
&mut page,
&mut weight,
overweight_limit
),
ItemExecutionStatus::Executed(false)
);
assert_eq!(weight.consumed(), 2.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"TooMuch"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
}
.into(),
);
let (pos, processed, payload) = page.peek_index(0).unwrap();
assert_eq!(pos, 0);
assert!(!processed);
assert_eq!(payload, b"TooMuch".encode());
});
}
#[test]
fn peek_index_works() {
use super::integration_test::Test; build_and_execute::<Test>(|| {
let (mut page, msgs) = full_page::<Test>();
let msg_enc_len = ItemHeader::<<Test as Config>::Size>::max_encoded_len() + 4;
for i in 0..msgs {
page.skip_first(i % 2 == 0);
let (pos, processed, payload) = page.peek_index(i).unwrap();
assert_eq!(pos, msg_enc_len * i);
assert_eq!(processed, i % 2 == 0);
assert_eq!(payload, (i as u32).encode());
}
});
}
#[test]
fn peek_first_and_skip_first_works() {
use super::integration_test::Test; build_and_execute::<Test>(|| {
let (mut page, msgs) = full_page::<Test>();
for i in 0..msgs {
let msg = page.peek_first().unwrap();
assert_eq!(msg.deref(), (i as u32).encode());
page.skip_first(i % 2 == 0); }
assert!(page.peek_first().is_none(), "Page must be at the end");
for i in 0..msgs {
let (_, processed, _) = page.peek_index(i).unwrap();
assert_eq!(processed, i % 2 == 0);
}
});
}
#[test]
fn note_processed_at_pos_works() {
use super::integration_test::Test; build_and_execute::<Test>(|| {
let (mut page, msgs) = full_page::<Test>();
for i in 0..msgs {
let (pos, processed, _) = page.peek_index(i).unwrap();
assert!(!processed);
assert_eq!(page.remaining as usize, msgs - i);
page.note_processed_at_pos(pos);
let (_, processed, _) = page.peek_index(i).unwrap();
assert!(processed);
assert_eq!(page.remaining as usize, msgs - i - 1);
}
for _ in 0..msgs {
page.peek_first().unwrap();
page.skip_first(false);
}
assert!(page.peek_first().is_none());
});
}
#[test]
fn note_processed_at_pos_idempotent() {
let (mut page, _) = full_page::<Test>();
page.note_processed_at_pos(0);
let original = page.clone();
page.note_processed_at_pos(0);
assert_eq!(page.heap, original.heap);
}
#[test]
fn is_complete_works() {
use super::integration_test::Test; build_and_execute::<Test>(|| {
let (mut page, msgs) = full_page::<Test>();
assert!(msgs > 3, "Boring");
let msg_enc_len = ItemHeader::<<Test as Config>::Size>::max_encoded_len() + 4;
assert!(!page.is_complete());
for i in 0..msgs {
if i % 2 == 0 {
page.skip_first(false);
} else {
page.note_processed_at_pos(msg_enc_len * i);
}
}
assert!(!page.is_complete());
for i in 0..msgs {
if i % 2 == 0 {
assert!(!page.is_complete());
let (pos, _, _) = page.peek_index(i).unwrap();
page.note_processed_at_pos(pos);
}
}
assert!(page.is_complete());
assert_eq!(page.remaining_size, 0);
for i in 0..msgs {
let (_, processed, _) = page.peek_index(i).unwrap();
assert!(processed);
}
});
}
#[test]
fn page_from_message_basic_works() {
assert!(MaxMessageLenOf::<Test>::get() > 0, "pre-condition unmet");
let mut msg: BoundedVec<u8, MaxMessageLenOf<Test>> = Default::default();
msg.bounded_resize(MaxMessageLenOf::<Test>::get() as usize, 123);
let page = PageOf::<Test>::from_message::<Test>(msg.as_bounded_slice());
assert_eq!(page.remaining, 1);
assert_eq!(page.remaining_size as usize, msg.len());
assert!(page.first_index == 0 && page.first == 0 && page.last == 0);
let mut heap = Vec::<u8>::new();
let header =
ItemHeader::<<Test as Config>::Size> { payload_len: msg.len() as u32, is_processed: false };
heap.extend(header.encode());
heap.extend(msg.deref());
assert_eq!(page.heap, heap);
}
#[test]
fn page_try_append_message_basic_works() {
use super::integration_test::Test;
let mut page = PageOf::<Test>::default();
let mut msgs = 0;
for i in 0..u32::MAX {
let r = i.using_encoded(|i| page.try_append_message::<Test>(i.try_into().unwrap()));
if r.is_err() {
break;
} else {
msgs += 1;
}
}
let expected_msgs = (<Test as Config>::HeapSize::get()) /
(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u32 + 4);
assert_eq!(expected_msgs, msgs, "Wrong number of messages");
assert_eq!(page.remaining, msgs);
assert_eq!(page.remaining_size, msgs * 4);
let mut heap = Vec::<u8>::new();
for i in 0..msgs {
let header = ItemHeader::<<Test as Config>::Size> { payload_len: 4, is_processed: false };
heap.extend(header.encode());
heap.extend(i.encode());
}
assert_eq!(page.heap, heap);
}
#[test]
fn page_try_append_message_max_msg_len_works_works() {
use super::integration_test::Test;
let mut page = PageOf::<Test>::default();
let msg = vec![123u8; MaxMessageLenOf::<Test>::get() as usize];
page.try_append_message::<Test>(BoundedSlice::defensive_truncate_from(&msg))
.unwrap();
page.try_append_message::<Test>(BoundedSlice::defensive_truncate_from(&[]))
.unwrap_err();
assert_eq!(page.heap.len(), <Test as Config>::HeapSize::get() as usize);
}
#[test]
fn page_try_append_message_with_remaining_size_works_works() {
use super::integration_test::Test; let header_size = ItemHeader::<<Test as Config>::Size>::max_encoded_len();
let mut page = PageOf::<Test>::default();
let mut remaining = <Test as Config>::HeapSize::get() as usize;
let mut msgs = Vec::new();
let mut rng = StdRng::seed_from_u64(42);
while remaining >= header_size {
let take = rng.gen_range(0..=(remaining - header_size));
let msg = vec![123u8; take];
page.try_append_message::<Test>(BoundedSlice::defensive_truncate_from(&msg))
.unwrap();
remaining -= take + header_size;
msgs.push(msg);
}
assert!(remaining < header_size);
assert_eq!(<Test as Config>::HeapSize::get() as usize - page.heap.len(), remaining);
assert_eq!(page.remaining as usize, msgs.len());
assert_eq!(
page.remaining_size as usize,
msgs.iter().fold(0, |mut a, m| {
a += m.len();
a
})
);
let mut heap = Vec::new();
for msg in msgs.into_iter() {
let header = ItemHeader::<<Test as Config>::Size> {
payload_len: msg.len() as u32,
is_processed: false,
};
heap.extend(header.encode());
heap.extend(msg);
}
assert_eq!(page.heap, heap);
}
#[test]
fn page_from_message_max_len_works() {
let max_msg_len: usize = MaxMessageLenOf::<Test>::get() as usize;
let page = PageOf::<Test>::from_message::<Test>(vec![1; max_msg_len][..].try_into().unwrap());
assert_eq!(page.remaining, 1);
}
#[test]
fn sweep_queue_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
build_triple_ring();
QueueChanges::take();
let book = BookStateFor::<Test>::get(Here);
assert!(book.begin != book.end);
assert_eq!(ServiceHead::<Test>::get(), Some(Here));
MessageQueue::sweep_queue(Here);
assert_ring(&[There, Everywhere(0)]);
let book = BookStateFor::<Test>::get(Here);
assert_eq!(book.begin, book.end);
assert!(ServiceHead::<Test>::get() != Some(Everywhere(0)));
MessageQueue::sweep_queue(Everywhere(0));
assert_ring(&[There]);
let book = BookStateFor::<Test>::get(Everywhere(0));
assert_eq!(book.begin, book.end);
MessageQueue::sweep_queue(There);
let book = BookStateFor::<Test>::get(There);
assert_eq!(book.begin, book.end);
assert_ring(&[]);
assert!(QueueChanges::take().is_empty());
})
}
#[test]
fn sweep_queue_wraps_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
build_ring::<Test>(&[Here]);
MessageQueue::sweep_queue(Here);
let book = BookStateFor::<Test>::get(Here);
assert!(book.ready_neighbours.is_none());
});
}
#[test]
fn sweep_queue_invalid_noops() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
assert_storage_noop!(MessageQueue::sweep_queue(Here));
});
}
#[test]
fn footprint_works() {
build_and_execute::<Test>(|| {
let origin = MessageOrigin::Here;
let (page, msgs) = full_page::<Test>();
let book = book_for::<Test>(&page);
BookStateFor::<Test>::insert(origin, book);
let info = MessageQueue::footprint(origin);
assert_eq!(info.storage.count as usize, msgs);
assert_eq!(info.storage.size, page.remaining_size as u64);
assert_eq!(info.pages, 1);
assert!(QueueChanges::take().is_empty());
})
}
#[test]
fn footprint_invalid_works() {
build_and_execute::<Test>(|| {
let origin = MessageOrigin::Here;
assert_eq!(MessageQueue::footprint(origin), Default::default());
})
}
#[test]
fn footprint_on_swept_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
build_ring::<Test>(&[Here]);
MessageQueue::sweep_queue(Here);
let fp = MessageQueue::footprint(Here);
assert_eq!((1, 1, 1), (fp.storage.count, fp.storage.size, fp.pages));
})
}
#[test]
fn footprint_num_pages_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("weight=200"), Here);
MessageQueue::enqueue_message(msg("weight=300"), Here);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 2, 20));
assert_eq!(MessageQueue::service_queues(1.into_weight()), 0.into_weight());
assert_eq!(System::events().len(), 2);
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 2, 20));
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(300.into_weight(), (Here, 0, 1))
.unwrap(),
300.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), fp(1, 0, 1, 10));
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(200.into_weight(), (Here, 0, 0))
.unwrap(),
200.into_weight()
);
assert_eq!(MessageQueue::footprint(Here), Default::default());
assert_eq!(MessageQueue::footprint(Here), fp(0, 0, 0, 0));
MessageQueue::enqueue_message(msg("weight=3"), Here);
assert_eq!(MessageQueue::footprint(Here), fp(1, 1, 1, 8));
})
}
#[test]
fn execute_overweight_works() {
build_and_execute::<Test>(|| {
set_weight("bump_service_head", 1.into_weight());
set_weight("service_queue_base", 1.into_weight());
set_weight("service_page_base_completion", 1.into_weight());
let origin = MessageOrigin::Here;
MessageQueue::enqueue_message(msg("weight=200"), origin);
let book = BookStateFor::<Test>::get(origin);
assert_eq!(book.message_count, 1);
assert!(Pages::<Test>::contains_key(origin, 0));
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(QueueChanges::take(), vec![(origin, 1, 10)]);
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=200"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
}
.into(),
);
let consumed =
<MessageQueue as ServiceQueues>::execute_overweight(5.into_weight(), (origin, 0, 0));
assert_eq!(consumed, Err(ExecuteOverweightError::InsufficientWeight));
assert_eq!(Pages::<Test>::iter().count(), 1);
assert!(QueueChanges::take().is_empty());
let consumed =
<MessageQueue as ServiceQueues>::execute_overweight(200.into_weight(), (origin, 0, 0))
.unwrap();
assert_eq!(consumed, 200.into_weight());
assert_eq!(QueueChanges::take(), vec![(origin, 0, 0)]);
let book = BookStateFor::<Test>::get(origin);
assert_eq!(book.message_count, 0);
assert_eq!(Pages::<Test>::iter().count(), 0);
let consumed =
<MessageQueue as ServiceQueues>::execute_overweight(70.into_weight(), (origin, 0, 0));
assert_eq!(consumed, Err(ExecuteOverweightError::NotFound));
assert!(QueueChanges::take().is_empty());
assert!(!Pages::<Test>::contains_key(origin, 0), "Page is gone");
assert!(!ServiceHead::<Test>::exists(), "No ready book");
});
}
#[test]
fn permanently_overweight_book_unknits() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
set_weight("bump_service_head", 1.into_weight());
set_weight("service_queue_base", 1.into_weight());
set_weight("service_page_base_completion", 1.into_weight());
MessageQueue::enqueue_messages([msg("weight=200")].into_iter(), Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=200"),
origin: Here,
message_index: 0,
page_index: 0,
}
.into(),
);
assert_ring(&[]);
assert_eq!(MessagesProcessed::take().len(), 0);
assert_eq!(BookStateFor::<Test>::get(Here).message_count, 1);
assert_eq!(MessageQueue::footprint(Here).pages, 1);
MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 5.into_weight());
assert_eq!(MessagesProcessed::take().len(), 1);
assert_ring(&[]);
});
}
#[test]
fn permanently_overweight_book_unknits_multiple() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
set_weight("bump_service_head", 1.into_weight());
set_weight("service_queue_base", 1.into_weight());
set_weight("service_page_base_completion", 1.into_weight());
MessageQueue::enqueue_messages(
[msg("weight=1"), msg("weight=200"), msg("weight=200")].into_iter(),
Here,
);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(num_overweight_enqueued_events(), 1);
assert_eq!(MessagesProcessed::take().len(), 1);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 4.into_weight());
assert_eq!(num_overweight_enqueued_events(), 2);
assert_eq!(MessagesProcessed::take().len(), 0);
assert_ring(&[]);
MessageQueue::enqueue_messages([msg("weight=1")].into_iter(), Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(4.into_weight()), 4.into_weight());
assert_eq!(MessagesProcessed::take().len(), 1);
assert_ring(&[]);
});
}
#[test]
fn permanently_overweight_limit_is_valid_basic() {
use MessageOrigin::*;
for w in 50..300 {
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(Weight::MAX);
set_weight("bump_service_head", 10.into());
set_weight("service_queue_base", 10.into());
set_weight("service_page_base_no_completion", 10.into());
set_weight("service_page_base_completion", 0.into());
set_weight("service_page_item", 10.into());
set_weight("ready_ring_unknit", 10.into());
let m = "weight=200".to_string();
MessageQueue::enqueue_message(msg(&m), Here);
MessageQueue::service_queues(w.into());
let last_event =
frame_system::Pallet::<Test>::events().into_iter().last().expect("No event");
if w < 250 {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::OverweightEnqueued {
id: blake2_256(m.as_bytes()),
origin: Here,
message_index: 0,
page_index: 0,
})
);
} else {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::Processed {
origin: Here,
weight_used: 200.into(),
id: blake2_256(m.as_bytes()).into(),
success: true,
})
);
}
});
}
}
#[test]
fn permanently_overweight_limit_is_valid_fuzzy() {
use MessageOrigin::*;
let mut rng = rand::rngs::StdRng::seed_from_u64(42);
for _ in 0..10 {
let (s1, s2) = (rng.gen_range(0..=10), rng.gen_range(0..=10));
let (s3, s4) = (rng.gen_range(0..=10), rng.gen_range(0..=10));
let s5 = rng.gen_range(0..=10);
let o = s1 + s2 + s3 + s4 + s5;
for w in o..=o + 300 {
build_and_execute::<Test>(|| {
DefaultWeightForCall::set(Weight::MAX);
set_weight("bump_service_head", s1.into());
set_weight("service_queue_base", s2.into());
set_weight("service_page_base_no_completion", s3.into());
set_weight("service_page_base_completion", 0.into());
set_weight("service_page_item", s4.into());
set_weight("ready_ring_unknit", s5.into());
let m = "weight=200".to_string();
MessageQueue::enqueue_message(msg(&m), Here);
MessageQueue::service_queues(w.into());
let last_event =
frame_system::Pallet::<Test>::events().into_iter().last().expect("No event");
if w < o + 200 {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::OverweightEnqueued {
id: blake2_256(m.as_bytes()),
origin: Here,
message_index: 0,
page_index: 0,
})
);
} else {
assert_eq!(
last_event.event,
RuntimeEvent::MessageQueue(Event::Processed {
origin: Here,
weight_used: 200.into(),
id: blake2_256(m.as_bytes()).into(),
success: true,
})
);
}
});
}
}
}
#[test]
#[cfg(not(debug_assertions))] fn ready_but_empty_does_not_panic() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
BookStateFor::<Test>::insert(There, empty_book::<Test>());
knit(&Here);
knit(&There);
assert_ring(&[Here, There]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
assert_ring(&[]);
});
}
#[test]
#[cfg(not(debug_assertions))] fn ready_but_perm_overweight_does_not_panic() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("weight=9"), Here);
assert_eq!(MessageQueue::service_queues(8.into_weight()), 0.into_weight());
assert_ring(&[]);
knit(&Here);
assert_ring(&[Here]);
assert_eq!(MessageQueue::service_queues(Weight::MAX), 0.into_weight());
assert_ring(&[]);
});
}
#[test]
fn ready_ring_knit_basic_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
for i in 0..10 {
if i % 2 == 0 {
knit(&Here);
assert_ring(&[Here]);
} else {
unknit(&Here);
assert_ring(&[]);
}
}
assert_ring(&[]);
});
}
#[test]
fn ready_ring_knit_and_unknit_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
BookStateFor::<Test>::insert(Here, empty_book::<Test>());
BookStateFor::<Test>::insert(There, empty_book::<Test>());
BookStateFor::<Test>::insert(Everywhere(0), empty_book::<Test>());
PausedQueues::set(vec![Here, There, Everywhere(0)]);
assert_ring(&[]);
knit(&Here);
assert_ring(&[Here]);
knit(&There);
assert_ring(&[Here, There]);
knit(&Everywhere(0));
assert_ring(&[Here, There, Everywhere(0)]);
unknit(&Here);
assert_ring(&[There, Everywhere(0)]);
unknit(&There);
assert_ring(&[Everywhere(0)]);
unknit(&Everywhere(0));
assert_ring(&[]);
});
}
#[test]
fn enqueue_message_works() {
use MessageOrigin::*;
let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);
build_and_execute::<Test>(|| {
let n = max_msg_per_page * 3;
for i in 1..=n {
MessageQueue::enqueue_message(msg("a"), Here);
assert_eq!(QueueChanges::take(), vec![(Here, i, i)], "OnQueueChanged not called");
}
assert_eq!(Pages::<Test>::iter().count(), 3);
MessageQueue::enqueue_message(msg("abc"), Here);
assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]);
assert_eq!(Pages::<Test>::iter().count(), 4);
assert_eq!(BookStateFor::<Test>::iter().count(), 1);
let book = BookStateFor::<Test>::get(Here);
assert_eq!(book.message_count, n + 1);
assert_eq!(book.size, n + 3);
assert_eq!((book.begin, book.end), (0, 4));
assert_eq!(book.count as usize, Pages::<Test>::iter().count());
});
}
#[test]
fn enqueue_messages_works() {
use MessageOrigin::*;
let max_msg_per_page = <Test as Config>::HeapSize::get() as u64 /
(ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u64 + 1);
build_and_execute::<Test>(|| {
let n = max_msg_per_page * 3;
let msgs = vec![msg("a"); n as usize];
MessageQueue::enqueue_messages(msgs.into_iter(), Here);
assert_eq!(QueueChanges::take(), vec![(Here, n, n)], "OnQueueChanged not called");
assert_eq!(Pages::<Test>::iter().count(), 3);
MessageQueue::enqueue_message(msg("abc"), Here);
assert_eq!(QueueChanges::take(), vec![(Here, n + 1, n + 3)]);
assert_eq!(Pages::<Test>::iter().count(), 4);
assert_eq!(BookStateFor::<Test>::iter().count(), 1);
let book = BookStateFor::<Test>::get(Here);
assert_eq!(book.message_count, n + 1);
assert_eq!(book.size, n + 3);
assert_eq!((book.begin, book.end), (0, 4));
assert_eq!(book.count as usize, Pages::<Test>::iter().count());
});
}
#[test]
fn service_queues_suspend_works() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
assert_eq!(QueueChanges::take(), vec![(Here, 3, 6), (There, 3, 6)]);
PausedQueues::set(vec![Here]);
assert_eq!(
(true, false),
(
<Test as Config>::QueuePausedQuery::is_paused(&Here),
<Test as Config>::QueuePausedQuery::is_paused(&There)
)
);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("x"), There)]);
assert_eq!(QueueChanges::take(), vec![(There, 2, 5)]);
PausedQueues::take();
assert_eq!(
(false, false),
(
<Test as Config>::QueuePausedQuery::is_paused(&Here),
<Test as Config>::QueuePausedQuery::is_paused(&There)
)
);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("xy"), There)]);
assert_eq!(QueueChanges::take(), vec![(There, 1, 3)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("a"), Here)]);
assert_eq!(QueueChanges::take(), vec![(Here, 2, 5)]);
PausedQueues::set(vec![There]);
assert_eq!(
(false, true),
(
<Test as Config>::QueuePausedQuery::is_paused(&Here),
<Test as Config>::QueuePausedQuery::is_paused(&There)
)
);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("ab"), Here)]);
assert_eq!(QueueChanges::take(), vec![(Here, 1, 3)]);
PausedQueues::take();
assert_eq!(
(false, false),
(
<Test as Config>::QueuePausedQuery::is_paused(&Here),
<Test as Config>::QueuePausedQuery::is_paused(&There)
)
);
assert_eq!(MessageQueue::service_queues(2.into_weight()), 2.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(vmsg("abc"), Here), (vmsg("xyz"), There)]);
assert_eq!(QueueChanges::take(), vec![(Here, 0, 0), (There, 0, 0)]);
});
}
#[test]
fn execute_overweight_respects_suspension() {
build_and_execute::<Test>(|| {
let origin = MessageOrigin::Here;
MessageQueue::enqueue_message(msg("weight=200"), origin);
MessageQueue::service_queues(4.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=200"),
origin,
message_index: 0,
page_index: 0,
}
.into(),
);
PausedQueues::set(vec![origin]);
assert!(<Test as Config>::QueuePausedQuery::is_paused(&origin));
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(Weight::MAX, (origin, 0, 0)),
Err(ExecuteOverweightError::QueuePaused)
);
PausedQueues::take();
assert!(!<Test as Config>::QueuePausedQuery::is_paused(&origin));
assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
Weight::MAX,
(origin, 0, 0)
));
assert_last_event::<Test>(
Event::Processed {
id: blake2_256(b"weight=200").into(),
origin,
weight_used: 200.into_weight(),
success: true,
}
.into(),
);
});
}
#[test]
fn service_queue_suspension_ready_ring_works() {
build_and_execute::<Test>(|| {
let origin = MessageOrigin::Here;
PausedQueues::set(vec![origin]);
MessageQueue::enqueue_message(msg("weight=5"), origin);
MessageQueue::service_queues(Weight::MAX);
assert!(System::events().is_empty(), "Paused");
assert_ring(&[origin]);
PausedQueues::take();
MessageQueue::service_queues(Weight::MAX);
assert_last_event::<Test>(
Event::Processed {
id: blake2_256(b"weight=5").into(),
origin,
weight_used: 5.into_weight(),
success: true,
}
.into(),
);
});
}
#[test]
fn integrity_test_checks_service_weight() {
build_and_execute::<Test>(|| {
assert_eq!(<Test as Config>::ServiceWeight::get(), Some(100.into()), "precond");
assert!(MessageQueue::do_integrity_test().is_ok(), "precond");
DefaultWeightForCall::set(20.into());
assert!(MessageQueue::do_integrity_test().is_ok());
DefaultWeightForCall::set(101.into());
assert_eq!(MessageQueue::single_msg_overhead(), 505.into());
assert!(MessageQueue::do_integrity_test().is_err());
for f in [
"bump_service_head",
"service_queue_base",
"service_page_base_completion",
"service_page_base_no_completion",
"service_page_item",
"ready_ring_unknit",
] {
WeightForCall::take();
DefaultWeightForCall::set(Zero::zero());
assert!(MessageQueue::do_integrity_test().is_ok());
set_weight(f, 101.into());
assert!(MessageQueue::do_integrity_test().is_err());
}
});
}
#[test]
fn regression_issue_2319() {
build_and_execute::<Test>(|| {
Callback::set(Box::new(|_, _| {
MessageQueue::enqueue_message(mock_helpers::msg("anothermessage"), There);
Ok(())
}));
use MessageOrigin::*;
MessageQueue::enqueue_message(msg("callback=0"), Here);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(b"callback=0".to_vec(), Here)]);
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(b"anothermessage".to_vec(), There)]);
});
}
#[test]
fn recursive_enqueue_works() {
build_and_execute::<Test>(|| {
Callback::set(Box::new(|o, i| {
match i {
0 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 1)), *o);
},
1 => {
for _ in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 2)), *o);
}
for i in 0..100 {
MessageQueue::enqueue_message(msg(&format!("callback={}", 3)), i.into());
}
},
2 | 3 => {
MessageQueue::enqueue_message(msg(&format!("callback={}", 4)), *o);
},
4 => (),
_ => unreachable!(),
};
Ok(())
}));
MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
for _ in 0..402 {
assert_eq!(MessageQueue::service_queues(1.into_weight()), 1.into_weight());
}
assert_eq!(MessageQueue::service_queues(Weight::MAX), Weight::zero());
assert_eq!(MessagesProcessed::take().len(), 402);
});
}
#[test]
fn recursive_service_is_forbidden() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
Callback::set(Box::new(|_, _| {
MessageQueue::enqueue_message(msg("m1"), There);
assert_storage_noop!(MessageQueue::service_queues(10.into_weight()));
MessageQueue::enqueue_message(msg("m2"), There);
Ok(())
}));
for _ in 0..5 {
MessageQueue::enqueue_message(msg("callback=0"), Here);
MessageQueue::service_queues(3.into_weight());
assert_eq!(
MessagesProcessed::take(),
vec![
(b"callback=0".to_vec(), Here),
(b"m1".to_vec(), There),
(b"m2".to_vec(), There)
]
);
}
});
}
#[test]
fn recursive_overweight_while_service_is_forbidden() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
Callback::set(Box::new(|_, _| {
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=200"),
origin: There,
message_index: 0,
page_index: 0,
}
.into(),
);
assert_noop!(
<MessageQueue as ServiceQueues>::execute_overweight(
10.into_weight(),
(There, 0, 0)
),
ExecuteOverweightError::RecursiveDisallowed
);
Ok(())
}));
MessageQueue::enqueue_message(msg("weight=200"), There);
MessageQueue::enqueue_message(msg("callback=0"), Here);
MessageQueue::service_queues(5.into_weight());
assert_ok!(<MessageQueue as ServiceQueues>::execute_overweight(
200.into_weight(),
(There, 0, 0)
));
});
}
#[test]
fn recursive_reap_page_is_forbidden() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
Callback::set(Box::new(|_, _| {
assert_noop!(MessageQueue::do_reap_page(&Here, 0), Error::<Test>::RecursiveDisallowed);
Ok(())
}));
let n = (MaxStale::get() + 10) as usize;
for _ in 0..n {
MessageQueue::enqueue_message(msg("weight=200"), Here);
}
MessageQueue::service_queues(1.into_weight());
assert_ok!(MessageQueue::do_reap_page(&Here, 0));
assert_last_event::<Test>(Event::PageReaped { origin: Here, index: 0 }.into());
});
}
#[test]
fn with_service_mutex_works() {
let mut called = 0;
with_service_mutex(|| called = 1).unwrap();
assert_eq!(called, 1);
with_service_mutex(|| with_service_mutex(|| unreachable!()))
.unwrap()
.unwrap_err();
with_service_mutex(|| with_service_mutex(|| unreachable!()).unwrap_err()).unwrap();
with_service_mutex(|| {
with_service_mutex(|| unreachable!()).unwrap_err();
with_service_mutex(|| unreachable!()).unwrap_err();
called = 2;
})
.unwrap();
assert_eq!(called, 2);
with_service_mutex(|| called = 3).unwrap();
assert_eq!(called, 3);
}
#[test]
fn process_enqueued_on_idle() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_messages(vec![msg("a"), msg("ab"), msg("abc")].into_iter(), Here);
assert_eq!(BookStateFor::<Test>::iter().count(), 1);
Pallet::<Test>::on_initialize(1);
assert_eq!(
MessagesProcessed::take(),
vec![(b"a".to_vec(), Here), (b"ab".to_vec(), Here), (b"abc".to_vec(), Here),]
);
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
assert_eq!(BookStateFor::<Test>::iter().count(), 2);
Pallet::<Test>::on_idle(1, Weight::from_parts(100, 100));
assert_eq!(
MessagesProcessed::take(),
vec![(b"x".to_vec(), There), (b"xy".to_vec(), There), (b"xyz".to_vec(), There)]
);
})
}
#[test]
fn process_enqueued_on_idle_requires_enough_weight() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
Pallet::<Test>::on_initialize(1);
MessageQueue::enqueue_messages(vec![msg("x"), msg("xy"), msg("xyz")].into_iter(), There);
assert_eq!(BookStateFor::<Test>::iter().count(), 1);
Pallet::<Test>::on_idle(1, Weight::from_parts(0, 0));
assert_eq!(MessagesProcessed::take(), vec![]);
assert!(!System::events().into_iter().any(|e| matches!(
e.event,
RuntimeEvent::MessageQueue(Event::<Test>::OverweightEnqueued { .. })
)));
})
}
#[test]
fn process_discards_stack_ov_message() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("stacklimitreached"), Here);
MessageQueue::service_queues(10.into_weight());
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"stacklimitreached").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::StackLimitReached,
}
.into(),
);
assert!(MessagesProcessed::take().is_empty());
assert_pages(&[]);
});
}
#[test]
fn execute_overweight_keeps_stack_ov_message() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
IgnoreStackOvError::set(true);
MessageQueue::enqueue_message(msg("weight=200 stacklimitreached"), Here);
MessageQueue::service_queues(0.into_weight());
assert_last_event::<Test>(
Event::OverweightEnqueued {
id: blake2_256(b"weight=200 stacklimitreached"),
origin: MessageOrigin::Here,
message_index: 0,
page_index: 0,
}
.into(),
);
assert!(MessagesProcessed::take().is_empty());
assert_pages(&[0]);
IgnoreStackOvError::set(false);
System::reset_events();
let storage_noop = StorageNoopGuard::new();
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(3.into_weight(), (Here, 0, 0)),
Err(ExecuteOverweightError::Other)
);
assert_last_event::<Test>(
Event::ProcessingFailed {
id: blake2_256(b"weight=200 stacklimitreached").into(),
origin: MessageOrigin::Here,
error: ProcessMessageError::StackLimitReached,
}
.into(),
);
System::reset_events();
drop(storage_noop);
IgnoreStackOvError::set(true);
assert_eq!(
<MessageQueue as ServiceQueues>::execute_overweight(200.into_weight(), (Here, 0, 0))
.unwrap(),
200.into_weight()
);
assert_last_event::<Test>(
Event::Processed {
id: blake2_256(b"weight=200 stacklimitreached").into(),
origin: MessageOrigin::Here,
weight_used: 200.into_weight(),
success: true,
}
.into(),
);
assert_pages(&[]);
System::reset_events();
});
}
#[test]
fn process_message_error_reverts_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");
Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Err(())
}));
MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());
assert!(!sp_io::storage::exists(b"key"), "Key should have been rolled back");
});
}
#[test]
fn process_message_ok_false_keeps_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");
Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Ok(())
}));
MessageQueue::enqueue_message(msg("callback=000"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());
assert_eq!(sp_io::storage::exists(b"key"), true);
});
}
#[test]
fn process_message_ok_true_keeps_storage_changes() {
build_and_execute::<Test>(|| {
assert!(!sp_io::storage::exists(b"key"), "Key should not exist");
Callback::set(Box::new(|_, _| {
sp_io::storage::set(b"key", b"value");
Ok(())
}));
MessageQueue::enqueue_message(msg("callback=0"), MessageOrigin::Here);
MessageQueue::service_queues(10.into_weight());
assert_eq!(sp_io::storage::exists(b"key"), true);
});
}
#[test]
fn force_set_head_can_starve_other_queues() {
use MessageOrigin::*;
build_and_execute::<Test>(|| {
for _ in 0..2 {
MessageQueue::enqueue_message(msg("A"), Here);
MessageQueue::enqueue_message(msg("B"), There);
MessageQueue::enqueue_message(msg("C"), Everywhere(0));
}
MessageQueue::service_queues(4.into_weight());
assert_eq!(
MessagesProcessed::take(),
vec![
(b"A".to_vec(), Here),
(b"A".to_vec(), Here),
(b"B".to_vec(), There),
(b"B".to_vec(), There)
]
);
MessageQueue::enqueue_message(msg("A"), Here);
frame_support::hypothetically! {{
MessageQueue::service_queues(1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(b"C".to_vec(), Everywhere(0))]);
}};
assert!(Pallet::<Test>::force_set_head(&mut WeightMeter::new(), &Here).unwrap());
MessageQueue::service_queues(1.into_weight());
assert_eq!(MessagesProcessed::take(), vec![(b"A".to_vec(), Here)]);
});
}
#[test]
fn force_set_head_noop_on_unready_queue() {
use crate::tests::MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("A"), Here);
MessageQueue::service_queues(1.into_weight());
assert_ring(&[]);
let _guard = StorageNoopGuard::new();
let was_set = Pallet::<Test>::force_set_head(&mut WeightMeter::new(), &There).unwrap();
assert!(!was_set);
});
}
#[test]
fn force_set_head_noop_on_current_head() {
use crate::tests::MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("A"), Here);
MessageQueue::enqueue_message(msg("A"), Here);
MessageQueue::service_queues(1.into_weight());
assert_ring(&[Here]);
let _guard = StorageNoopGuard::new();
let was_set = Pallet::<Test>::force_set_head(&mut WeightMeter::new(), &Here).unwrap();
assert!(was_set);
});
}
#[test]
fn force_set_head_noop_unprocessed_queue() {
use crate::tests::MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("A"), Here);
assert_ring(&[Here]);
let _guard = StorageNoopGuard::new();
let was_set = Pallet::<Test>::force_set_head(&mut WeightMeter::new(), &Here).unwrap();
assert!(was_set);
});
}
#[test]
fn force_set_head_works() {
use crate::tests::MessageOrigin::*;
build_and_execute::<Test>(|| {
MessageQueue::enqueue_message(msg("A"), Here);
MessageQueue::enqueue_message(msg("B"), There);
assert_eq!(ServiceHead::<Test>::get(), Some(Here));
assert_ring(&[Here, There]);
let was_set = Pallet::<Test>::force_set_head(&mut WeightMeter::new(), &There).unwrap();
assert!(was_set);
assert_eq!(ServiceHead::<Test>::get(), Some(There));
assert_ring(&[There, Here]);
});
}
fn check_get_batches_footprints(
origin: MessageOrigin,
sizes: &[u32],
total_pages_limit: u32,
expected_first_page_pos: usize,
expected_new_pages_counts: Vec<u32>,
) {
let mut msgs = vec![];
for size in sizes {
let msg =
BoundedVec::<u8, MaxMessageLenOf<Test>>::try_from(vec![0; *size as usize]).unwrap();
msgs.push(msg)
}
let batches_footprints = MessageQueue::get_batches_footprints(
origin,
msgs.iter().map(|msg| msg.as_bounded_slice()),
total_pages_limit,
);
assert_eq!(batches_footprints.first_page_pos, expected_first_page_pos);
assert_eq!(batches_footprints.footprints.len(), expected_new_pages_counts.len());
let mut total_size = 0;
let mut expected_batches_footprint = vec![];
for (idx, expected_new_pages_count) in expected_new_pages_counts.iter().enumerate() {
total_size += msgs[idx].len();
expected_batches_footprint.push(BatchFootprint {
msgs_count: idx + 1,
size_in_bytes: total_size,
new_pages_count: *expected_new_pages_count,
});
}
assert_eq!(batches_footprints.footprints, expected_batches_footprint);
}
#[test]
fn get_batches_footprints_works() {
use crate::tests::MessageOrigin::*;
let max_message_len = MaxMessageLenOf::<Test>::get();
let header_size = ItemHeader::<<Test as Config>::Size>::max_encoded_len() as u32;
build_and_execute::<Test>(|| {
check_get_batches_footprints(Here, &[max_message_len], 0, 0, vec![]);
check_get_batches_footprints(Here, &[max_message_len], 1, 0, vec![1]);
check_get_batches_footprints(Here, &[max_message_len, 1], 1, 0, vec![1]);
check_get_batches_footprints(Here, &[max_message_len, 1], 2, 0, vec![1, 2]);
check_get_batches_footprints(
Here,
&[max_message_len - 2 * header_size, 1, 1],
1,
0,
vec![1, 1],
);
MessageQueue::enqueue_message(msg("A".repeat(max_message_len as usize).as_str()), Here);
MessageQueue::enqueue_message(msg(""), Here);
check_get_batches_footprints(Here, &[max_message_len - header_size], 1, 5, vec![]);
check_get_batches_footprints(Here, &[max_message_len - header_size], 2, 5, vec![0]);
check_get_batches_footprints(Here, &[max_message_len - header_size, 1], 2, 5, vec![0]);
check_get_batches_footprints(Here, &[max_message_len - header_size, 1], 3, 5, vec![0, 1]);
check_get_batches_footprints(
Here,
&[max_message_len - header_size, max_message_len - 2 * header_size, 1, 1],
3,
5,
vec![0, 1, 1],
);
check_get_batches_footprints(There, &[max_message_len], 1, 0, vec![1]);
});
}