polkadot_runtime_parachains/
dmp.rs1use crate::{
46 configuration::{self, HostConfiguration},
47 initializer, paras, FeeTracker,
48};
49use alloc::vec::Vec;
50use core::fmt;
51use frame_support::pallet_prelude::*;
52use frame_system::pallet_prelude::BlockNumberFor;
53use polkadot_primitives::{DownwardMessage, Hash, Id as ParaId, InboundDownwardMessage};
54use sp_core::MAX_POSSIBLE_ALLOCATION;
55use sp_runtime::{
56 traits::{BlakeTwo256, Hash as HashT, SaturatedConversion},
57 FixedU128, Saturating,
58};
59use xcm::latest::SendError;
60
61pub use pallet::*;
62
63#[cfg(test)]
64mod tests;
65
66const THRESHOLD_FACTOR: u32 = 2;
67const EXPONENTIAL_FEE_BASE: FixedU128 = FixedU128::from_rational(105, 100); const MESSAGE_SIZE_FEE_BASE: FixedU128 = FixedU128::from_rational(1, 1000); #[cfg_attr(test, derive(Debug))]
72pub enum QueueDownwardMessageError {
73 ExceedsMaxMessageSize,
75 Unroutable,
77}
78
79impl From<QueueDownwardMessageError> for SendError {
80 fn from(err: QueueDownwardMessageError) -> Self {
81 match err {
82 QueueDownwardMessageError::ExceedsMaxMessageSize => SendError::ExceedsMaxMessageSize,
83 QueueDownwardMessageError::Unroutable => SendError::Unroutable,
84 }
85 }
86}
87
88pub(crate) enum ProcessedDownwardMessagesAcceptanceErr {
91 AdvancementRule,
93 Underflow { processed_downward_messages: u32, dmq_length: u32 },
95}
96
97impl fmt::Debug for ProcessedDownwardMessagesAcceptanceErr {
98 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
99 use ProcessedDownwardMessagesAcceptanceErr::*;
100 match *self {
101 AdvancementRule => {
102 write!(fmt, "DMQ is not empty, but processed_downward_messages is 0",)
103 },
104 Underflow { processed_downward_messages, dmq_length } => write!(
105 fmt,
106 "processed_downward_messages = {}, but dmq_length is only {}",
107 processed_downward_messages, dmq_length,
108 ),
109 }
110 }
111}
112
113#[frame_support::pallet]
114pub mod pallet {
115 use super::*;
116
117 #[pallet::pallet]
118 #[pallet::without_storage_info]
119 pub struct Pallet<T>(_);
120
121 #[pallet::config]
122 pub trait Config: frame_system::Config + configuration::Config + paras::Config {}
123
124 #[pallet::storage]
126 pub type DownwardMessageQueues<T: Config> = StorageMap<
127 _,
128 Twox64Concat,
129 ParaId,
130 Vec<InboundDownwardMessage<BlockNumberFor<T>>>,
131 ValueQuery,
132 >;
133
134 #[pallet::storage]
142 pub(crate) type DownwardMessageQueueHeads<T: Config> =
143 StorageMap<_, Twox64Concat, ParaId, Hash, ValueQuery>;
144
145 #[pallet::type_value]
147 pub fn InitialFactor() -> FixedU128 {
148 FixedU128::from_u32(1)
149 }
150
151 #[pallet::storage]
153 pub(crate) type DeliveryFeeFactor<T: Config> =
154 StorageMap<_, Twox64Concat, ParaId, FixedU128, ValueQuery, InitialFactor>;
155}
156impl<T: Config> Pallet<T> {
158 pub(crate) fn initializer_initialize(_now: BlockNumberFor<T>) -> Weight {
160 Weight::zero()
161 }
162
163 pub(crate) fn initializer_finalize() {}
165
166 pub(crate) fn initializer_on_new_session(
168 _notification: &initializer::SessionChangeNotification<BlockNumberFor<T>>,
169 outgoing_paras: &[ParaId],
170 ) {
171 Self::perform_outgoing_para_cleanup(outgoing_paras);
172 }
173
174 fn perform_outgoing_para_cleanup(outgoing: &[ParaId]) {
177 for outgoing_para in outgoing {
178 Self::clean_dmp_after_outgoing(outgoing_para);
179 }
180 }
181
182 fn clean_dmp_after_outgoing(outgoing_para: &ParaId) {
184 DownwardMessageQueues::<T>::remove(outgoing_para);
185 DownwardMessageQueueHeads::<T>::remove(outgoing_para);
186 }
187
188 pub fn can_queue_downward_message(
192 config: &HostConfiguration<BlockNumberFor<T>>,
193 para: &ParaId,
194 msg: &DownwardMessage,
195 ) -> Result<(), QueueDownwardMessageError> {
196 let serialized_len = msg.len() as u32;
197 if serialized_len > config.max_downward_message_size {
198 return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
199 }
200
201 if Self::dmq_length(*para) > Self::dmq_max_length(config.max_downward_message_size) {
203 return Err(QueueDownwardMessageError::ExceedsMaxMessageSize)
204 }
205
206 if !paras::Heads::<T>::contains_key(para) {
208 return Err(QueueDownwardMessageError::Unroutable)
209 }
210
211 Ok(())
212 }
213
214 pub fn queue_downward_message(
223 config: &HostConfiguration<BlockNumberFor<T>>,
224 para: ParaId,
225 msg: DownwardMessage,
226 ) -> Result<(), QueueDownwardMessageError> {
227 let serialized_len = msg.len() as u32;
228 Self::can_queue_downward_message(config, ¶, &msg)?;
229
230 let inbound =
231 InboundDownwardMessage { msg, sent_at: frame_system::Pallet::<T>::block_number() };
232
233 DownwardMessageQueueHeads::<T>::mutate(para, |head| {
235 let new_head =
236 BlakeTwo256::hash_of(&(*head, inbound.sent_at, T::Hashing::hash_of(&inbound.msg)));
237 *head = new_head;
238 });
239
240 let q_len = DownwardMessageQueues::<T>::mutate(para, |v| {
241 v.push(inbound);
242 v.len()
243 });
244
245 let threshold =
246 Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
247 if q_len > (threshold as usize) {
248 let message_size_factor = FixedU128::from((serialized_len / 1024) as u128)
249 .saturating_mul(MESSAGE_SIZE_FEE_BASE);
250 Self::increase_fee_factor(para, message_size_factor);
251 }
252
253 Ok(())
254 }
255
256 pub(crate) fn check_processed_downward_messages(
258 para: ParaId,
259 relay_parent_number: BlockNumberFor<T>,
260 processed_downward_messages: u32,
261 ) -> Result<(), ProcessedDownwardMessagesAcceptanceErr> {
262 let dmq_length = Self::dmq_length(para);
263
264 if dmq_length > 0 && processed_downward_messages == 0 {
265 let contents = Self::dmq_contents(para);
269
270 if contents.get(0).map_or(false, |msg| msg.sent_at <= relay_parent_number) {
272 return Err(ProcessedDownwardMessagesAcceptanceErr::AdvancementRule)
273 }
274 }
275
276 if dmq_length < processed_downward_messages {
281 return Err(ProcessedDownwardMessagesAcceptanceErr::Underflow {
282 processed_downward_messages,
283 dmq_length,
284 })
285 }
286
287 Ok(())
288 }
289
290 pub(crate) fn prune_dmq(para: ParaId, processed_downward_messages: u32) {
292 let q_len = DownwardMessageQueues::<T>::mutate(para, |q| {
293 let processed_downward_messages = processed_downward_messages as usize;
294 if processed_downward_messages > q.len() {
295 q.clear();
298 } else {
299 *q = q.split_off(processed_downward_messages);
300 }
301 q.len()
302 });
303
304 let config = configuration::ActiveConfig::<T>::get();
305 let threshold =
306 Self::dmq_max_length(config.max_downward_message_size).saturating_div(THRESHOLD_FACTOR);
307 if q_len <= (threshold as usize) {
308 Self::decrease_fee_factor(para);
309 }
310 }
311
312 #[cfg(test)]
315 fn dmq_mqc_head(para: ParaId) -> Hash {
316 DownwardMessageQueueHeads::<T>::get(¶)
317 }
318
319 pub(crate) fn dmq_length(para: ParaId) -> u32 {
323 DownwardMessageQueues::<T>::decode_len(¶)
324 .unwrap_or(0)
325 .saturated_into::<u32>()
326 }
327
328 fn dmq_max_length(max_downward_message_size: u32) -> u32 {
329 MAX_POSSIBLE_ALLOCATION.checked_div(max_downward_message_size).unwrap_or(0)
330 }
331
332 pub(crate) fn dmq_contents(
336 recipient: ParaId,
337 ) -> Vec<InboundDownwardMessage<BlockNumberFor<T>>> {
338 DownwardMessageQueues::<T>::get(&recipient)
339 }
340
341 #[cfg(any(feature = "runtime-benchmarks", feature = "std"))]
345 pub fn make_parachain_reachable(para: impl Into<ParaId>) {
346 let para = para.into();
347 crate::paras::Heads::<T>::insert(para, para.encode());
348 }
349}
350
351impl<T: Config> FeeTracker for Pallet<T> {
352 type Id = ParaId;
353
354 fn get_fee_factor(id: Self::Id) -> FixedU128 {
355 DeliveryFeeFactor::<T>::get(id)
356 }
357
358 fn increase_fee_factor(id: Self::Id, message_size_factor: FixedU128) -> FixedU128 {
359 DeliveryFeeFactor::<T>::mutate(id, |f| {
360 *f = f.saturating_mul(EXPONENTIAL_FEE_BASE.saturating_add(message_size_factor));
361 *f
362 })
363 }
364
365 fn decrease_fee_factor(id: Self::Id) -> FixedU128 {
366 DeliveryFeeFactor::<T>::mutate(id, |f| {
367 *f = InitialFactor::get().max(*f / EXPONENTIAL_FEE_BASE);
368 *f
369 })
370 }
371}
372
373#[cfg(feature = "runtime-benchmarks")]
374impl<T: Config> crate::EnsureForParachain for Pallet<T> {
375 fn ensure(para: ParaId) {
376 Self::make_parachain_reachable(para);
377 }
378}