Skip to main content

topsoil_core/traits/
messages.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: Apache-2.0 OR GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Traits for managing message queuing and handling.
8
9use super::storage::Footprint;
10use crate::defensive;
11
12use alloc::vec::Vec;
13use codec::{Decode, DecodeWithMemTracking, Encode, FullCodec, MaxEncodedLen};
14use core::{cmp::Ordering, fmt::Debug, marker::PhantomData};
15use scale_info::TypeInfo;
16use subsoil::core::{ConstU32, Get, TypedGet};
17use subsoil::runtime::{traits::Convert, BoundedSlice};
18use subsoil::weights::{Weight, WeightMeter};
19
20/// Errors that can happen when attempting to process a message with
21/// [`ProcessMessage::process_message()`].
22#[derive(Copy, Clone, Eq, PartialEq, Encode, Decode, DecodeWithMemTracking, TypeInfo, Debug)]
23pub enum ProcessMessageError {
24	/// The message data format is unknown (e.g. unrecognised header)
25	BadFormat,
26	/// The message data is bad (e.g. decoding returns an error).
27	Corrupt,
28	/// The message format is unsupported (e.g. old XCM version).
29	Unsupported,
30	/// Message processing was not attempted because it was not certain that the weight limit
31	/// would be respected. The parameter gives the maximum weight which the message could take
32	/// to process.
33	Overweight(Weight),
34	/// The queue wants to give up its current processing slot.
35	///
36	/// Hints the message processor to cease servicing this queue and proceed to the next
37	/// one. This is seen as a *hint*, not an instruction. Implementations must therefore handle
38	/// the case that a queue is re-serviced within the same block after *yielding*. A queue is
39	/// not required to *yield* again when it is being re-serviced withing the same block.
40	Yield,
41	/// The message could not be processed for reaching the stack depth limit.
42	StackLimitReached,
43}
44
45/// Can process messages from a specific origin.
46pub trait ProcessMessage {
47	/// The transport from where a message originates.
48	type Origin: FullCodec + MaxEncodedLen + Clone + Eq + PartialEq + TypeInfo + Debug;
49
50	/// Process the given message, using no more than the remaining `meter` weight to do so.
51	///
52	/// Returns whether the message was processed.
53	fn process_message(
54		message: &[u8],
55		origin: Self::Origin,
56		meter: &mut WeightMeter,
57		id: &mut [u8; 32],
58	) -> Result<bool, ProcessMessageError>;
59}
60
61/// Errors that can happen when attempting to execute an overweight message with
62/// [`ServiceQueues::execute_overweight()`].
63#[derive(Eq, PartialEq, Debug)]
64pub enum ExecuteOverweightError {
65	/// The referenced message was not found.
66	NotFound,
67	/// The message was already processed.
68	///
69	/// This can be treated as success condition.
70	AlreadyProcessed,
71	/// The available weight was insufficient to execute the message.
72	InsufficientWeight,
73	/// The queue is paused and no message can be executed from it.
74	///
75	/// This can change at any time and may resolve in the future by re-trying.
76	QueuePaused,
77	/// An unspecified error.
78	Other,
79	/// Another call is currently ongoing and prevents this call from executing.
80	RecursiveDisallowed,
81}
82
83/// Can service queues and execute overweight messages.
84pub trait ServiceQueues {
85	/// Addresses a specific overweight message.
86	type OverweightMessageAddress;
87
88	/// Service all message queues in some fair manner.
89	///
90	/// - `weight_limit`: The maximum amount of dynamic weight that this call can use.
91	///
92	/// Returns the dynamic weight used by this call; is never greater than `weight_limit`.
93	/// Should only be called in top-level runtime entry points like `on_initialize` or `on_idle`.
94	/// Otherwise, stack depth limit errors may be miss-handled.
95	fn service_queues(weight_limit: Weight) -> Weight;
96
97	/// Executes a message that could not be executed by [`Self::service_queues()`] because it was
98	/// temporarily overweight.
99	fn execute_overweight(
100		_weight_limit: Weight,
101		_address: Self::OverweightMessageAddress,
102	) -> Result<Weight, ExecuteOverweightError> {
103		Err(ExecuteOverweightError::NotFound)
104	}
105}
106
107/// Services queues by doing nothing.
108pub struct NoopServiceQueues<OverweightAddr>(PhantomData<OverweightAddr>);
109impl<OverweightAddr> ServiceQueues for NoopServiceQueues<OverweightAddr> {
110	type OverweightMessageAddress = OverweightAddr;
111
112	fn service_queues(_: Weight) -> Weight {
113		Weight::zero()
114	}
115}
116
117/// Can enqueue messages for multiple origins.
118pub trait EnqueueMessage<Origin: MaxEncodedLen> {
119	/// The maximal length any enqueued message may have.
120	type MaxMessageLen: Get<u32>;
121
122	/// Enqueue a single `message` from a specific `origin`.
123	fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: Origin);
124
125	/// Enqueue multiple `messages` from a specific `origin`.
126	fn enqueue_messages<'a>(
127		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
128		origin: Origin,
129	);
130
131	/// Any remaining unprocessed messages should happen only lazily, not proactively.
132	fn sweep_queue(origin: Origin);
133}
134
135impl<Origin: MaxEncodedLen> EnqueueMessage<Origin> for () {
136	type MaxMessageLen = ConstU32<0>;
137	fn enqueue_message(_: BoundedSlice<u8, Self::MaxMessageLen>, _: Origin) {}
138	fn enqueue_messages<'a>(
139		_: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
140		_: Origin,
141	) {
142	}
143	fn sweep_queue(_: Origin) {}
144}
145
146/// The resource footprint of a queue.
147#[derive(Default, Copy, Clone, Eq, PartialEq, Debug)]
148pub struct QueueFootprint {
149	/// The number of pages in the queue (including overweight pages).
150	pub pages: u32,
151	/// The number of pages that are ready (not yet processed and also not overweight).
152	pub ready_pages: u32,
153	/// The storage footprint of the queue (including overweight messages).
154	pub storage: Footprint,
155}
156
157/// The resource footprint of a batch of messages.
158#[derive(Default, Copy, Clone, PartialEq, Debug)]
159pub struct BatchFootprint {
160	/// The number of messages in the batch.
161	pub msgs_count: usize,
162	/// The total size in bytes of all the messages in the batch.
163	pub size_in_bytes: usize,
164	/// The number of resulting new pages in the queue if the current batch was added.
165	pub new_pages_count: u32,
166}
167
168/// The resource footprints of continuous subsets of messages.
169///
170/// For a set of messages `xcms[0..n]`, each `footprints[i]` contains the footprint
171/// of the batch `xcms[0..i]`, so as `i` increases `footprints[i]` contains the footprint
172/// of a bigger batch.
173#[derive(Default, Debug)]
174pub struct BatchesFootprints {
175	/// The position in the first available MQ page where the batch will start being appended.
176	///
177	/// The messages in the batch will be enqueued to the message queue. Since the message queue is
178	/// organized in pages, the messages may be enqueued across multiple contiguous pages.
179	/// The position where we start appending messages to the first available MQ page is of
180	/// particular importance since it impacts the performance of the enqueuing operation.
181	/// That's because the first page has to be decoded first. This is not needed for the following
182	/// pages.
183	pub first_page_pos: usize,
184	pub footprints: Vec<BatchFootprint>,
185}
186
187impl BatchesFootprints {
188	/// Appends a batch footprint to the back of the collection.
189	///
190	/// The new footprint represents a batch that includes all the messages contained by the
191	/// previous batches plus the provided `msg`. If `new_page` is true, we will consider that
192	/// the provided `msg` is appended to a new message queue page. Otherwise, we consider
193	/// that it is appended to the current page.
194	pub fn push(&mut self, msg: &[u8], new_page: bool) {
195		let previous_footprint =
196			self.footprints.last().map(|footprint| *footprint).unwrap_or_default();
197
198		let mut new_pages_count = previous_footprint.new_pages_count;
199		if new_page {
200			new_pages_count = new_pages_count.saturating_add(1);
201		}
202		self.footprints.push(BatchFootprint {
203			msgs_count: previous_footprint.msgs_count.saturating_add(1),
204			size_in_bytes: previous_footprint.size_in_bytes.saturating_add(msg.len()),
205			new_pages_count,
206		});
207	}
208
209	/// Gets the biggest batch for which the comparator function returns `Ordering::Less`.
210	pub fn search_best_by<F>(&self, f: F) -> &BatchFootprint
211	where
212		F: FnMut(&BatchFootprint) -> Ordering,
213	{
214		// Since the batches are sorted by size, we can use binary search.
215		let maybe_best_idx = match self.footprints.binary_search_by(f) {
216			Ok(last_ok_idx) => Some(last_ok_idx),
217			Err(first_err_idx) => first_err_idx.checked_sub(1),
218		};
219		if let Some(best_idx) = maybe_best_idx {
220			match self.footprints.get(best_idx) {
221				Some(best_footprint) => return best_footprint,
222				None => {
223					defensive!("Invalid best_batch_idx: {}", best_idx);
224				},
225			}
226		}
227		&BatchFootprint { msgs_count: 0, size_in_bytes: 0, new_pages_count: 0 }
228	}
229}
230
231/// Provides information on queue footprint.
232pub trait QueueFootprintQuery<Origin> {
233	/// The maximal length any enqueued message may have.
234	type MaxMessageLen: Get<u32>;
235
236	/// Return the state footprint of the given queue.
237	fn footprint(origin: Origin) -> QueueFootprint;
238
239	/// Get the `BatchFootprint` for each batch of messages `[0..n]`
240	/// as long as the total number of pages would be <= `total_pages_limit`.
241	///
242	/// # Examples
243	///
244	/// Let's consider that each message would result in a new page and that there's already 1
245	/// full page in the queue. Then, for the messages `["1", "2", "3"]`
246	/// and `total_pages_limit = 3`, `get_batches_footprints()` would return:
247	/// ```
248	/// use topsoil_core::traits::BatchFootprint;
249	///
250	/// vec![
251	/// 	// The footprint of batch ["1"]
252	/// 	BatchFootprint {
253	/// 		msgs_count: 1,
254	/// 		size_in_bytes: 1,
255	/// 		new_pages_count: 1, // total pages count = 2
256	/// 	},
257	/// 	// The footprint of batch ["1", "2"]
258	/// 	BatchFootprint {
259	/// 		msgs_count: 2,
260	/// 		size_in_bytes: 2,
261	/// 		new_pages_count: 2, // total pages count = 3
262	/// 	}
263	/// 	// For the batch ["1", "2", "3"], the total pages count would be 4, which would exceed
264	/// 	// the `total_pages_limit`.
265	/// ];
266	/// ```
267	fn get_batches_footprints<'a>(
268		origin: Origin,
269		msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
270		total_pages_limit: u32,
271	) -> BatchesFootprints;
272}
273
274impl<Origin: MaxEncodedLen> QueueFootprintQuery<Origin> for () {
275	type MaxMessageLen = ConstU32<0>;
276
277	fn footprint(_: Origin) -> QueueFootprint {
278		QueueFootprint::default()
279	}
280
281	fn get_batches_footprints<'a>(
282		_origin: Origin,
283		_msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
284		_total_pages_limit: u32,
285	) -> BatchesFootprints {
286		BatchesFootprints::default()
287	}
288}
289
290/// Transform the origin of an [`EnqueueMessage`] via `C::convert`.
291pub struct TransformOrigin<E, O, N, C>(PhantomData<(E, O, N, C)>);
292impl<E: EnqueueMessage<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>> EnqueueMessage<N>
293	for TransformOrigin<E, O, N, C>
294{
295	type MaxMessageLen = E::MaxMessageLen;
296
297	fn enqueue_message(message: BoundedSlice<u8, Self::MaxMessageLen>, origin: N) {
298		E::enqueue_message(message, C::convert(origin));
299	}
300
301	fn enqueue_messages<'a>(
302		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
303		origin: N,
304	) {
305		E::enqueue_messages(messages, C::convert(origin));
306	}
307
308	fn sweep_queue(origin: N) {
309		E::sweep_queue(C::convert(origin));
310	}
311}
312
313impl<E: QueueFootprintQuery<O>, O: MaxEncodedLen, N: MaxEncodedLen, C: Convert<N, O>>
314	QueueFootprintQuery<N> for TransformOrigin<E, O, N, C>
315{
316	type MaxMessageLen = E::MaxMessageLen;
317
318	fn footprint(origin: N) -> QueueFootprint {
319		E::footprint(C::convert(origin))
320	}
321
322	fn get_batches_footprints<'a>(
323		origin: N,
324		msgs: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
325		total_pages_limit: u32,
326	) -> BatchesFootprints {
327		E::get_batches_footprints(C::convert(origin), msgs, total_pages_limit)
328	}
329}
330
331/// Handles incoming messages for a single origin.
332pub trait HandleMessage {
333	/// The maximal length any enqueued message may have.
334	type MaxMessageLen: Get<u32>;
335
336	/// Enqueue a single `message` with an implied origin.
337	fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>);
338
339	/// Enqueue multiple `messages` from an implied origin.
340	fn handle_messages<'a>(
341		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
342	);
343
344	/// Any remaining unprocessed messages should happen only lazily, not proactively.
345	fn sweep_queue();
346}
347
348/// Adapter type to transform an [`EnqueueMessage`] with an origin into a [`HandleMessage`] impl.
349pub struct EnqueueWithOrigin<E, O>(PhantomData<(E, O)>);
350impl<E: EnqueueMessage<O::Type>, O: TypedGet> HandleMessage for EnqueueWithOrigin<E, O>
351where
352	O::Type: MaxEncodedLen,
353{
354	type MaxMessageLen = E::MaxMessageLen;
355
356	fn handle_message(message: BoundedSlice<u8, Self::MaxMessageLen>) {
357		E::enqueue_message(message, O::get());
358	}
359
360	fn handle_messages<'a>(
361		messages: impl Iterator<Item = BoundedSlice<'a, u8, Self::MaxMessageLen>>,
362	) {
363		E::enqueue_messages(messages, O::get());
364	}
365
366	fn sweep_queue() {
367		E::sweep_queue(O::get());
368	}
369}
370
371/// Provides information on paused queues.
372pub trait QueuePausedQuery<Origin> {
373	/// Whether this queue is paused.
374	fn is_paused(origin: &Origin) -> bool;
375}
376
377#[impl_trait_for_tuples::impl_for_tuples(8)]
378impl<Origin> QueuePausedQuery<Origin> for Tuple {
379	fn is_paused(origin: &Origin) -> bool {
380		for_tuples!( #(
381			if Tuple::is_paused(origin) {
382				return true;
383			}
384		)* );
385		false
386	}
387}