polkadot_runtime_parachains/
dmp.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Polkadot.
3
4// Polkadot is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Polkadot is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Polkadot.  If not, see <http://www.gnu.org/licenses/>.
16
17//! To prevent Out of Memory errors on the `DownwardMessageQueue`, an
18//! exponential fee factor (`DeliveryFeeFactor`) is set. The fee factor
19//! increments exponentially after the number of messages in the
20//! `DownwardMessageQueue` passes a threshold. This threshold is set as:
21//!
22//! ```ignore
23//! // Maximum max sized messages that can be send to
24//! // the DownwardMessageQueue before it runs out of memory
25//! max_messages = MAX_POSSIBLE_ALLOCATION / max_downward_message_size
26//! threshold = max_messages / THRESHOLD_FACTOR
27//! ```
28//! Based on the THRESHOLD_FACTOR, the threshold is set as a fraction of the
29//! total messages. The `DeliveryFeeFactor` increases for a message over the
30//! threshold by:
31//!
32//! `DeliveryFeeFactor = DeliveryFeeFactor *
33//! (EXPONENTIAL_FEE_BASE + MESSAGE_SIZE_FEE_BASE * encoded_message_size_in_KB)`
34//!
35//! And decreases when the number of messages in the `DownwardMessageQueue` fall
36//! below the threshold by:
37//!
38//! `DeliveryFeeFactor = DeliveryFeeFactor / EXPONENTIAL_FEE_BASE`
39//!
40//! As an extra defensive measure, a `max_messages` hard
41//! limit is set to the number of messages in the DownwardMessageQueue. Messages
42//! that would increase the number of messages in the queue above this hard
43//! limit are dropped.
44
45use crate::{
46	configuration::{self, HostConfiguration},
47	initializer, paras, FeeTracker,
48};
49use alloc::vec::Vec;
50use core::fmt;
51use frame_support::pallet_prelude::*;
52use frame_system::pallet_prelude::BlockNumberFor;
53use polkadot_primitives::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
54use sp_core::MAX_POSSIBLE_ALLOCATION;
55use sp_runtime::{
56	traits::{BlakeTwo256, Hash as HashT, SaturatedConversion},
57	FixedU128, Saturating,
58};
59use xcm::latest::SendError;
60
61pub use pallet::*;
62
63#[cfg(test)]
64mod tests;
65
66const THRESHOLD_FACTOR: u32 = 2;
67const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); // 1.05
68const MESSAGE_SIZE_FEE_BASE: FixedU128 = FixedU128::from_rational(1, 1000); // 0.001
69
70/// An error sending a downward message.
71#[cfg_attr(test, derive(Debug))]
72pub enum QueueDownwardMessageError {
73	/// The message being sent exceeds the configured max message size.
74	ExceedsMaxMessageSize,
75	/// The destination is unknown.
76	Unroutable,
77}
78
79impl From<QueueDownwardMessageError> for SendError {
80	fn from(err: QueueDownwardMessageError) -> Self {
81		match err {
82			QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
83			QueueDownwardMessageError::Unroutable => SendError::Unroutable,
84		}
85	}
86}
87
88/// An error returned by [`Pallet::check_processed_downward_messages`] that indicates an acceptance
89/// check didn't pass.
90pub(crate) enum ProcessedDownwardMessagesAcceptanceErr {
91	/// If there are pending messages then `processed_downward_messages` should be at least 1,
92	AdvancementRule,
93	/// `processed_downward_messages` should not be greater than the number of pending messages.
94	Underflow { processed_downward_messages: u32, dmq_length: u32 },
95}
96
97impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
98	fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
99		use ProcessedDownwardMessagesAcceptanceErr::*;
100		match *self {
101			AdvancementRule => {
102				write!(fmt, "DMQ is not empty, but processed_downward_messages is 0",)
103			},
104			Underflow { processed_downward_messages, dmq_length } => write!(
105				fmt,
106				"processed_downward_messages = {}, but dmq_length is only {}",
107				processed_downward_messages, dmq_length,
108			),
109		}
110	}
111}
112
113#[frame_support::pallet]
114pub mod pallet {
115	use super::*;
116
117	#[pallet::pallet]
118	#[pallet::without_storage_info]
119	pub struct Pallet<T>(_);
120
121	#[pallet::config]
122	pub trait Config: frame_system::Config + configuration::Config + paras::Config {}
123
124	/// The downward messages addressed for a certain para.
125	#[pallet::storage]
126	pub type DownwardMessageQueues<T: Config> = StorageMap<
127		_,
128		Twox64Concat,
129		ParaId,
130		Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
131		ValueQuery,
132	>;
133
134	/// A mapping that stores the downward message queue MQC head for each para.
135	///
136	/// Each link in this chain has a form:
137	/// `(prev_head, B, H(M))`, where
138	/// - `prev_head`: is the previous head hash or zero if none.
139	/// - `B`: is the relay-chain block number in which a message was appended.
140	/// - `H(M)`: is the hash of the message being appended.
141	#[pallet::storage]
142	pub(crate) type DownwardMessageQueueHeads<T: Config> =
143		StorageMap<_, Twox64Concat, ParaId, Hash, ValueQuery>;
144
145	/// Initialization value for the DeliveryFee factor.
146	#[pallet::type_value]
147	pub fn InitialFactor() -> FixedU128 {
148		FixedU128::from_u32(1)
149	}
150
151	/// The factor to multiply the base delivery fee by.
152	#[pallet::storage]
153	pub(crate) type DeliveryFeeFactor<T: Config> =
154		StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
155}
156/// Routines and getters related to downward message passing.
157impl<T: Config> Pallet<T> {
158	/// Block initialization logic, called by initializer.
159	pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
160		Weight::zero()
161	}
162
163	/// Block finalization logic, called by initializer.
164	pub(crate) fn initializer_finalize() {}
165
166	/// Called by the initializer to note that a new session has started.
167	pub(crate) fn initializer_on_new_session(
168		_notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
169		outgoing_paras: &[ParaId],
170	) {
171		Self::perform_outgoing_para_cleanup(outgoing_paras);
172	}
173
174	/// Iterate over all paras that were noted for offboarding and remove all the data
175	/// associated with them.
176	fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
177		for outgoing_para in outgoing {
178			Self::clean_dmp_after_outgoing(outgoing_para);
179		}
180	}
181
182	/// Remove all relevant storage items for an outgoing parachain.
183	fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
184		DownwardMessageQueues::<T>::remove(outgoing_para);
185		DownwardMessageQueueHeads::<T>::remove(outgoing_para);
186	}
187
188	/// Determine whether enqueuing a downward message to a specific recipient para would result
189	/// in an error. If this returns `Ok(())` the caller can be certain that a call to
190	/// `queue_downward_message` with the same parameters will be successful.
191	pub fn can_queue_downward_message(
192		config: &HostConfiguration<BlockNumberFor<T>>,
193		para: &ParaId,
194		msg: &DownwardMessage,
195	) -> Result<(), QueueDownwardMessageError> {
196		let serialized_len = msg.len() as u32;
197		if serialized_len > config.max_downward_message_size {
198			return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
199		}
200
201		// Hard limit on Queue size
202		if Self::dmq_length(*para) > Self::dmq_max_length(config.max_downward_message_size) {
203			return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
204		}
205
206		// If the head exists, we assume the parachain is legit and exists.
207		if !paras::Heads::<T>::contains_key(para) {
208			return Err(QueueDownwardMessageError::Unroutable)
209		}
210
211		Ok(())
212	}
213
214	/// Enqueue a downward message to a specific recipient para.
215	///
216	/// When encoded, the message should not exceed the `config.max_downward_message_size`.
217	/// Otherwise, the message won't be sent and `Err` will be returned.
218	///
219	/// It is possible to send a downward message to a non-existent para. That, however, would lead
220	/// to a dangling storage. If the caller cannot statically prove that the recipient exists
221	/// then the caller should perform a runtime check.
222	pub fn queue_downward_message(
223		config: &HostConfiguration<BlockNumberFor<T>>,
224		para: ParaId,
225		msg: DownwardMessage,
226	) -> Result<(), QueueDownwardMessageError> {
227		let serialized_len = msg.len() as u32;
228		Self::can_queue_downward_message(config, &para, &msg)?;
229
230		let inbound =
231			InboundDownwardMessage { msg, sent_at: frame_system::Pallet::<T>::block_number() };
232
233		// obtain the new link in the MQC and update the head.
234		DownwardMessageQueueHeads::<T>::mutate(para, |head| {
235			let new_head =
236				BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
237			*head = new_head;
238		});
239
240		let q_len = DownwardMessageQueues::<T>::mutate(para, |v| {
241			v.push(inbound);
242			v.len()
243		});
244
245		let threshold =
246			Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
247		if q_len > (threshold as usize) {
248			let message_size_factor = FixedU128::from((serialized_len / 1024) as u128)
249				.saturating_mul(MESSAGE_SIZE_FEE_BASE);
250			Self::increase_fee_factor(para, message_size_factor);
251		}
252
253		Ok(())
254	}
255
256	/// Checks if the number of processed downward messages is valid.
257	pub(crate) fn check_processed_downward_messages(
258		para: ParaId,
259		relay_parent_number: BlockNumberFor<T>,
260		processed_downward_messages: u32,
261	) -> Result<(), ProcessedDownwardMessagesAcceptanceErr> {
262		let dmq_length = Self::dmq_length(para);
263
264		if dmq_length > 0 && processed_downward_messages == 0 {
265			// The advancement rule is for at least one downwards message to be processed
266			// if the queue is non-empty at the relay-parent. Downwards messages are annotated
267			// with the block number, so we compare the earliest (first) against the relay parent.
268			let contents = Self::dmq_contents(para);
269
270			// sanity: if dmq_length is >0 this should always be 'Some'.
271			if contents.get(0).map_or(false, |msg| msg.sent_at <= relay_parent_number) {
272				return Err(ProcessedDownwardMessagesAcceptanceErr::AdvancementRule)
273			}
274		}
275
276		// Note that we might be allowing a parachain to signal that it's processed
277		// messages that hadn't been placed in the queue at the relay_parent.
278		// only 'stupid' parachains would do it and we don't (and can't) force anyone
279		// to act on messages, so the lenient approach is fine here.
280		if dmq_length < processed_downward_messages {
281			return Err(ProcessedDownwardMessagesAcceptanceErr::Underflow {
282				processed_downward_messages,
283				dmq_length,
284			})
285		}
286
287		Ok(())
288	}
289
290	/// Prunes the specified number of messages from the downward message queue of the given para.
291	pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) {
292		let q_len = DownwardMessageQueues::<T>::mutate(para, |q| {
293			let processed_downward_messages = processed_downward_messages as usize;
294			if processed_downward_messages > q.len() {
295				// reaching this branch is unexpected due to the constraint established by
296				// `check_processed_downward_messages`. But better be safe than sorry.
297				q.clear();
298			} else {
299				*q = q.split_off(processed_downward_messages);
300			}
301			q.len()
302		});
303
304		let config = configuration::ActiveConfig::<T>::get();
305		let threshold =
306			Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
307		if q_len <= (threshold as usize) {
308			Self::decrease_fee_factor(para);
309		}
310	}
311
312	/// Returns the Head of Message Queue Chain for the given para or `None` if there is none
313	/// associated with it.
314	#[cfg(test)]
315	fn dmq_mqc_head(para: ParaId) -> Hash {
316		DownwardMessageQueueHeads::<T>::get(&para)
317	}
318
319	/// Returns the number of pending downward messages addressed to the given para.
320	///
321	/// Returns 0 if the para doesn't have an associated downward message queue.
322	pub(crate) fn dmq_length(para: ParaId) -> u32 {
323		DownwardMessageQueues::<T>::decode_len(&para)
324			.unwrap_or(0)
325			.saturated_into::<u32>()
326	}
327
328	fn dmq_max_length(max_downward_message_size: u32) -> u32 {
329		MAX_POSSIBLE_ALLOCATION.checked_div(max_downward_message_size).unwrap_or(0)
330	}
331
332	/// Returns the downward message queue contents for the given para.
333	///
334	/// The most recent messages are the latest in the vector.
335	pub(crate) fn dmq_contents(
336		recipient: ParaId,
337	) -> Vec<InboundDownwardMessage<BlockNumberFor<T>>> {
338		DownwardMessageQueues::<T>::get(&recipient)
339	}
340
341	/// Make the parachain reachable for downward messages.
342	///
343	/// Only useable in benchmarks or tests.
344	#[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
345	pub fn make_parachain_reachable(para: impl Into<ParaId>) {
346		let para = para.into();
347		crate::paras::Heads::<T>::insert(para, para.encode());
348	}
349}
350
351impl<T: Config> FeeTracker for Pallet<T> {
352	type Id = ParaId;
353
354	fn get_fee_factor(id: Self::Id) -> FixedU128 {
355		DeliveryFeeFactor::<T>::get(id)
356	}
357
358	fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
359		DeliveryFeeFactor::<T>::mutate(id, |f| {
360			*f = f.saturating_mul(EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor));
361			*f
362		})
363	}
364
365	fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
366		DeliveryFeeFactor::<T>::mutate(id, |f| {
367			*f = InitialFactor::get().max(*f / EXPONENTIAL_FEE_BASE);
368			*f
369		})
370	}
371}
372
373#[cfg(feature = "runtime-benchmarks")]
374impl<T: Config> crate::EnsureForParachain for Pallet<T> {
375	fn ensure(para: ParaId) {
376		Self::make_parachain_reachable(para);
377	}
378}