use pezsp_runtime::traits::Zero;
mod benchmarking;
pub mod migration;
mod mock_helpers;
mod types;
extern crate alloc;
#[cfg(test)]
mod tests;
use crate::{configuration, paras, scheduler::common::Assignment};
use alloc::collections::BinaryHeap;
use core::mem::take;
use pezframe_support::{
pezpallet_prelude::*,
traits::{
defensive_prelude::*,
Currency,
ExistenceRequirement::{self, AllowDeath, KeepAlive},
WithdrawReasons,
},
PalletId,
};
use pezframe_system::{pezpallet_prelude::*, Pezpallet as System};
use pezkuwi_primitives::{CoreIndex, Id as ParaId};
use pezsp_runtime::{
traits::{AccountIdConversion, One, SaturatedConversion},
FixedPointNumber, FixedPointOperand, FixedU128, Perbill, Saturating,
};
use types::{
BalanceOf, CoreAffinityCount, EnqueuedOrder, QueuePushDirection, QueueStatusType,
SpotTrafficCalculationErr,
};
const LOG_TARGET: &str = "runtime::teyrchains::on-demand";
pub use pezpallet::*;
pub trait WeightInfo {
fn place_order_allow_death(s: u32) -> Weight;
fn place_order_keep_alive(s: u32) -> Weight;
fn place_order_with_credits(s: u32) -> Weight;
}
pub struct TestWeightInfo;
impl WeightInfo for TestWeightInfo {
fn place_order_allow_death(_: u32) -> Weight {
Weight::MAX
}
fn place_order_keep_alive(_: u32) -> Weight {
Weight::MAX
}
fn place_order_with_credits(_: u32) -> Weight {
Weight::MAX
}
}
#[derive(Encode, Decode, TypeInfo, Debug, PartialEq, Clone, Eq)]
enum PaymentType {
Credits,
Balance,
}
#[pezframe_support::pezpallet]
pub mod pezpallet {
use super::*;
const STORAGE_VERSION: StorageVersion = StorageVersion::new(1);
#[pezpallet::pezpallet]
#[pezpallet::without_storage_info]
#[pezpallet::storage_version(STORAGE_VERSION)]
pub struct Pezpallet<T>(_);
#[pezpallet::config]
pub trait Config: pezframe_system::Config + configuration::Config + paras::Config {
#[allow(deprecated)]
type RuntimeEvent: From<Event<Self>>
+ IsType<<Self as pezframe_system::Config>::RuntimeEvent>;
type Currency: Currency<Self::AccountId>;
type WeightInfo: WeightInfo;
#[pezpallet::constant]
type TrafficDefaultValue: Get<FixedU128>;
#[pezpallet::constant]
type MaxHistoricalRevenue: Get<u32>;
#[pezpallet::constant]
type PalletId: Get<PalletId>;
}
#[pezpallet::type_value]
pub(super) fn QueueStatusOnEmpty<T: Config>() -> QueueStatusType {
QueueStatusType { traffic: T::TrafficDefaultValue::get(), ..Default::default() }
}
#[pezpallet::type_value]
pub(super) fn EntriesOnEmpty<T: Config>() -> BinaryHeap<EnqueuedOrder> {
BinaryHeap::new()
}
#[pezpallet::storage]
pub(super) type ParaIdAffinity<T: Config> =
StorageMap<_, Twox64Concat, ParaId, CoreAffinityCount, OptionQuery>;
#[pezpallet::storage]
pub(super) type QueueStatus<T: Config> =
StorageValue<_, QueueStatusType, ValueQuery, QueueStatusOnEmpty<T>>;
#[pezpallet::storage]
pub(super) type FreeEntries<T: Config> =
StorageValue<_, BinaryHeap<EnqueuedOrder>, ValueQuery, EntriesOnEmpty<T>>;
#[pezpallet::storage]
pub(super) type AffinityEntries<T: Config> = StorageMap<
_,
Twox64Concat,
CoreIndex,
BinaryHeap<EnqueuedOrder>,
ValueQuery,
EntriesOnEmpty<T>,
>;
#[pezpallet::storage]
pub type Revenue<T: Config> =
StorageValue<_, BoundedVec<BalanceOf<T>, T::MaxHistoricalRevenue>, ValueQuery>;
#[pezpallet::storage]
pub type Credits<T: Config> =
StorageMap<_, Blake2_128Concat, T::AccountId, BalanceOf<T>, ValueQuery>;
#[pezpallet::event]
#[pezpallet::generate_deposit(pub(super) fn deposit_event)]
pub enum Event<T: Config> {
OnDemandOrderPlaced { para_id: ParaId, spot_price: BalanceOf<T>, ordered_by: T::AccountId },
SpotPriceSet { spot_price: BalanceOf<T> },
AccountCredited { who: T::AccountId, amount: BalanceOf<T> },
}
#[pezpallet::error]
pub enum Error<T> {
QueueFull,
SpotPriceHigherThanMaxAmount,
InsufficientCredits,
}
#[pezpallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pezpallet<T> {
fn on_initialize(_now: BlockNumberFor<T>) -> Weight {
Revenue::<T>::mutate(|revenue| {
if let Some(overdue) =
revenue.force_insert_keep_left(0, 0u32.into()).defensive_unwrap_or(None)
{
if let Some(last) = revenue.last_mut() {
*last = last.saturating_add(overdue);
}
}
});
let config = configuration::ActiveConfig::<T>::get();
QueueStatus::<T>::mutate(|queue_status| {
Self::update_spot_traffic(&config, queue_status);
});
T::DbWeight::get().reads_writes(3, 2)
}
}
#[pezpallet::call]
impl<T: Config> Pezpallet<T> {
#[pezpallet::call_index(0)]
#[pezpallet::weight(<T as Config>::WeightInfo::place_order_allow_death(QueueStatus::<T>::get().size()))]
#[allow(deprecated)]
#[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
pub fn place_order_allow_death(
origin: OriginFor<T>,
max_amount: BalanceOf<T>,
para_id: ParaId,
) -> DispatchResult {
let sender = ensure_signed(origin)?;
Pezpallet::<T>::do_place_order(
sender,
max_amount,
para_id,
AllowDeath,
PaymentType::Balance,
)
}
#[pezpallet::call_index(1)]
#[pezpallet::weight(<T as Config>::WeightInfo::place_order_keep_alive(QueueStatus::<T>::get().size()))]
#[allow(deprecated)]
#[deprecated(note = "This will be removed in favor of using `place_order_with_credits`")]
pub fn place_order_keep_alive(
origin: OriginFor<T>,
max_amount: BalanceOf<T>,
para_id: ParaId,
) -> DispatchResult {
let sender = ensure_signed(origin)?;
Pezpallet::<T>::do_place_order(
sender,
max_amount,
para_id,
KeepAlive,
PaymentType::Balance,
)
}
#[pezpallet::call_index(2)]
#[pezpallet::weight(<T as Config>::WeightInfo::place_order_with_credits(QueueStatus::<T>::get().size()))]
pub fn place_order_with_credits(
origin: OriginFor<T>,
max_amount: BalanceOf<T>,
para_id: ParaId,
) -> DispatchResult {
let sender = ensure_signed(origin)?;
Pezpallet::<T>::do_place_order(
sender,
max_amount,
para_id,
KeepAlive,
PaymentType::Credits,
)
}
}
}
impl<T: Config> Pezpallet<T>
where
BalanceOf<T>: FixedPointOperand,
{
pub fn pop_assignment_for_core(core_index: CoreIndex) -> Option<Assignment> {
let entry: Result<EnqueuedOrder, ()> = QueueStatus::<T>::try_mutate(|queue_status| {
AffinityEntries::<T>::try_mutate(core_index, |affinity_entries| {
let free_entry = FreeEntries::<T>::try_mutate(|free_entries| {
let affinity_next = affinity_entries.peek();
let free_next = free_entries.peek();
let pick_free = match (affinity_next, free_next) {
(None, _) => true,
(Some(_), None) => false,
(Some(a), Some(f)) => f < a,
};
if pick_free {
let entry = free_entries.pop().ok_or(())?;
let (mut affinities, free): (BinaryHeap<_>, BinaryHeap<_>) =
take(free_entries)
.into_iter()
.partition(|e| e.para_id == entry.para_id);
affinity_entries.append(&mut affinities);
*free_entries = free;
Ok(entry)
} else {
Err(())
}
});
let entry = free_entry.or_else(|()| affinity_entries.pop().ok_or(()))?;
queue_status.consume_index(entry.idx);
Ok(entry)
})
});
let assignment = entry.map(|e| Assignment::Pool { para_id: e.para_id, core_index }).ok()?;
Pezpallet::<T>::increase_affinity(assignment.para_id(), core_index);
Some(assignment)
}
pub fn assignment_duplicated(para_id: ParaId, core_index: CoreIndex) {
Pezpallet::<T>::increase_affinity(para_id, core_index);
}
pub fn report_processed(para_id: ParaId, core_index: CoreIndex) {
Pezpallet::<T>::decrease_affinity_update_queue(para_id, core_index);
}
pub fn push_back_assignment(para_id: ParaId, core_index: CoreIndex) {
Pezpallet::<T>::decrease_affinity_update_queue(para_id, core_index);
QueueStatus::<T>::mutate(|queue_status| {
Pezpallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Front);
});
}
pub fn credit_account(who: T::AccountId, amount: BalanceOf<T>) {
Credits::<T>::mutate(who.clone(), |credits| {
*credits = credits.saturating_add(amount);
});
Pezpallet::<T>::deposit_event(Event::<T>::AccountCredited { who, amount });
}
fn do_place_order(
sender: <T as pezframe_system::Config>::AccountId,
max_amount: BalanceOf<T>,
para_id: ParaId,
existence_requirement: ExistenceRequirement,
payment_type: PaymentType,
) -> DispatchResult {
let config = configuration::ActiveConfig::<T>::get();
QueueStatus::<T>::mutate(|queue_status| {
Self::update_spot_traffic(&config, queue_status);
let traffic = queue_status.traffic;
let spot_price: BalanceOf<T> = traffic.saturating_mul_int(
config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
);
ensure!(spot_price.le(&max_amount), Error::<T>::SpotPriceHigherThanMaxAmount);
ensure!(
queue_status.size() < config.scheduler_params.on_demand_queue_max_size,
Error::<T>::QueueFull
);
match payment_type {
PaymentType::Balance => {
let amt = T::Currency::withdraw(
&sender,
spot_price,
WithdrawReasons::FEE,
existence_requirement,
)?;
let pot = Self::account_id();
if !System::<T>::account_exists(&pot) {
System::<T>::inc_providers(&pot);
}
T::Currency::resolve_creating(&pot, amt);
},
PaymentType::Credits => {
let credits = Credits::<T>::get(&sender);
let new_credits_value =
credits.checked_sub(&spot_price).ok_or(Error::<T>::InsufficientCredits)?;
if new_credits_value.is_zero() {
Credits::<T>::remove(&sender);
} else {
Credits::<T>::insert(&sender, new_credits_value);
}
},
}
Revenue::<T>::mutate(|bounded_revenue| {
if let Some(current_block) = bounded_revenue.get_mut(0) {
*current_block = current_block.saturating_add(spot_price);
} else {
bounded_revenue.try_push(spot_price).defensive_ok();
}
});
Pezpallet::<T>::add_on_demand_order(queue_status, para_id, QueuePushDirection::Back);
Pezpallet::<T>::deposit_event(Event::<T>::OnDemandOrderPlaced {
para_id,
spot_price,
ordered_by: sender,
});
Ok(())
})
}
fn update_spot_traffic(
config: &configuration::HostConfiguration<BlockNumberFor<T>>,
queue_status: &mut QueueStatusType,
) {
let old_traffic = queue_status.traffic;
match Self::calculate_spot_traffic(
old_traffic,
config.scheduler_params.on_demand_queue_max_size,
queue_status.size(),
config.scheduler_params.on_demand_target_queue_utilization,
config.scheduler_params.on_demand_fee_variability,
) {
Ok(new_traffic) => {
if new_traffic != old_traffic {
queue_status.traffic = new_traffic;
let spot_price: BalanceOf<T> = new_traffic.saturating_mul_int(
config.scheduler_params.on_demand_base_fee.saturated_into::<BalanceOf<T>>(),
);
Pezpallet::<T>::deposit_event(Event::<T>::SpotPriceSet { spot_price });
}
},
Err(err) => {
log::debug!(
target: LOG_TARGET,
"Error calculating spot traffic: {:?}", err
);
},
};
}
fn calculate_spot_traffic(
traffic: FixedU128,
queue_capacity: u32,
queue_size: u32,
target_queue_utilisation: Perbill,
variability: Perbill,
) -> Result<FixedU128, SpotTrafficCalculationErr> {
if queue_capacity == 0 {
return Err(SpotTrafficCalculationErr::QueueCapacityIsZero);
}
if queue_size > queue_capacity {
return Err(SpotTrafficCalculationErr::QueueSizeLargerThanCapacity);
}
let queue_util_ratio = FixedU128::from_rational(queue_size.into(), queue_capacity.into());
let positive = queue_util_ratio >= target_queue_utilisation.into();
let queue_util_diff = queue_util_ratio.max(target_queue_utilisation.into())
- queue_util_ratio.min(target_queue_utilisation.into());
let var_times_qud = queue_util_diff.saturating_mul(variability.into());
let var_times_qud_pow = var_times_qud.saturating_mul(var_times_qud);
let div_by_two: FixedU128;
match var_times_qud_pow.const_checked_div(2.into()) {
Some(dbt) => div_by_two = dbt,
None => return Err(SpotTrafficCalculationErr::Division),
}
if positive {
let new_traffic = queue_util_diff
.saturating_add(div_by_two)
.saturating_add(One::one())
.saturating_mul(traffic);
Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
} else {
let new_traffic = queue_util_diff.saturating_sub(div_by_two).saturating_mul(traffic);
Ok(new_traffic.max(<T as Config>::TrafficDefaultValue::get()))
}
}
fn add_on_demand_order(
queue_status: &mut QueueStatusType,
para_id: ParaId,
location: QueuePushDirection,
) {
let idx = match location {
QueuePushDirection::Back => queue_status.push_back(),
QueuePushDirection::Front => queue_status.push_front(),
};
let affinity = ParaIdAffinity::<T>::get(para_id);
let order = EnqueuedOrder::new(idx, para_id);
#[cfg(test)]
log::debug!(target: LOG_TARGET, "add_on_demand_order, order: {:?}, affinity: {:?}, direction: {:?}", order, affinity, location);
match affinity {
None => FreeEntries::<T>::mutate(|entries| entries.push(order)),
Some(affinity) => {
AffinityEntries::<T>::mutate(affinity.core_index, |entries| entries.push(order))
},
}
}
fn decrease_affinity_update_queue(para_id: ParaId, core_index: CoreIndex) {
let affinity = Pezpallet::<T>::decrease_affinity(para_id, core_index);
#[cfg(not(test))]
debug_assert_ne!(
affinity, None,
"Decreased affinity for a para that has not been served on a core?"
);
if affinity != Some(0) {
return;
}
AffinityEntries::<T>::mutate(core_index, |affinity_entries| {
FreeEntries::<T>::mutate(|free_entries| {
let (mut freed, affinities): (BinaryHeap<_>, BinaryHeap<_>) =
take(affinity_entries).into_iter().partition(|e| e.para_id == para_id);
free_entries.append(&mut freed);
*affinity_entries = affinities;
})
});
}
fn decrease_affinity(para_id: ParaId, core_index: CoreIndex) -> Option<u32> {
ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| {
let affinity = maybe_affinity.as_mut()?;
if affinity.core_index == core_index {
let new_count = affinity.count.saturating_sub(1);
if new_count > 0 {
*maybe_affinity = Some(CoreAffinityCount { core_index, count: new_count });
} else {
*maybe_affinity = None;
}
return Some(new_count);
} else {
None
}
})
}
fn increase_affinity(para_id: ParaId, core_index: CoreIndex) {
ParaIdAffinity::<T>::mutate(para_id, |maybe_affinity| match maybe_affinity {
Some(affinity) => {
if affinity.core_index == core_index {
*maybe_affinity = Some(CoreAffinityCount {
core_index,
count: affinity.count.saturating_add(1),
});
}
},
None => {
*maybe_affinity = Some(CoreAffinityCount { core_index, count: 1 });
},
})
}
pub fn claim_revenue_until(when: BlockNumberFor<T>) -> BalanceOf<T> {
let now = <pezframe_system::Pezpallet<T>>::block_number();
let mut amount: BalanceOf<T> = BalanceOf::<T>::zero();
Revenue::<T>::mutate(|revenue| {
while !revenue.is_empty() {
let index = (revenue.len() - 1) as u32;
if when > now.saturating_sub(index.into()) {
amount = amount.saturating_add(revenue.pop().defensive_unwrap_or(0u32.into()));
} else {
break;
}
}
});
amount
}
pub fn account_id() -> T::AccountId {
T::PalletId::get().into_account_truncating()
}
#[cfg(test)]
fn get_affinity_map(para_id: ParaId) -> Option<CoreAffinityCount> {
ParaIdAffinity::<T>::get(para_id)
}
#[cfg(test)]
fn get_affinity_entries(core_index: CoreIndex) -> BinaryHeap<EnqueuedOrder> {
AffinityEntries::<T>::get(core_index)
}
#[cfg(test)]
fn get_free_entries() -> BinaryHeap<EnqueuedOrder> {
FreeEntries::<T>::get()
}
#[cfg(feature = "runtime-benchmarks")]
pub fn populate_queue(para_id: ParaId, num: u32) {
QueueStatus::<T>::mutate(|queue_status| {
for _ in 0..num {
Pezpallet::<T>::add_on_demand_order(
queue_status,
para_id,
QueuePushDirection::Back,
);
}
});
}
#[cfg(test)]
fn set_queue_status(new_status: QueueStatusType) {
QueueStatus::<T>::set(new_status);
}
#[cfg(test)]
fn get_queue_status() -> QueueStatusType {
QueueStatus::<T>::get()
}
#[cfg(test)]
fn get_traffic_default_value() -> FixedU128 {
<T as Config>::TrafficDefaultValue::get()
}
#[cfg(test)]
fn get_revenue() -> Vec<BalanceOf<T>> {
Revenue::<T>::get().to_vec()
}
}