mod mock_helpers;
#[cfg(test)]
mod tests;
use crate::{
configuration, on_demand,
paras::AssignCoretime,
scheduler::common::{Assignment, AssignmentProvider},
ParaId,
};
use alloc::{vec, vec::Vec};
use pezframe_support::{defensive, pezpallet_prelude::*};
use pezframe_system::pezpallet_prelude::*;
use pezkuwi_primitives::CoreIndex;
use pezpallet_broker::CoreAssignment;
use pezsp_runtime::traits::{One, Saturating};
pub use pezpallet::*;
#[derive(
RuntimeDebug,
Clone,
Copy,
PartialEq,
Eq,
PartialOrd,
Ord,
Encode,
Decode,
DecodeWithMemTracking,
TypeInfo,
)]
pub struct PartsOf57600(u16);
impl PartsOf57600 {
pub const ZERO: Self = Self(0);
pub const FULL: Self = Self(57600);
pub fn new_saturating(v: u16) -> Self {
Self::ZERO.saturating_add(Self(v))
}
pub fn is_full(&self) -> bool {
*self == Self::FULL
}
pub fn saturating_add(self, rhs: Self) -> Self {
let inner = self.0.saturating_add(rhs.0);
if inner > 57600 {
Self(57600)
} else {
Self(inner)
}
}
pub fn saturating_sub(self, rhs: Self) -> Self {
Self(self.0.saturating_sub(rhs.0))
}
pub fn checked_add(self, rhs: Self) -> Option<Self> {
let inner = self.0.saturating_add(rhs.0);
if inner > 57600 {
None
} else {
Some(Self(inner))
}
}
}
#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
struct Schedule<N> {
assignments: Vec<(CoreAssignment, PartsOf57600)>,
end_hint: Option<N>,
next_schedule: Option<N>,
}
#[derive(Encode, Decode, TypeInfo, Default)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
struct CoreDescriptor<N> {
queue: Option<QueueDescriptor<N>>,
current_work: Option<WorkState<N>>,
}
#[derive(Encode, Decode, TypeInfo, Copy, Clone)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug))]
struct QueueDescriptor<N> {
first: N,
last: N,
}
#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone))]
struct WorkState<N> {
assignments: Vec<(CoreAssignment, AssignmentState)>,
end_hint: Option<N>,
pos: u16,
step: PartsOf57600,
}
#[derive(Encode, Decode, TypeInfo)]
#[cfg_attr(test, derive(PartialEq, RuntimeDebug, Clone, Copy))]
struct AssignmentState {
ratio: PartsOf57600,
remaining: PartsOf57600,
}
impl<N> From<Schedule<N>> for WorkState<N> {
fn from(schedule: Schedule<N>) -> Self {
let Schedule { assignments, end_hint, next_schedule: _ } = schedule;
let step =
if let Some(min_step_assignment) = assignments.iter().min_by(|a, b| a.1.cmp(&b.1)) {
min_step_assignment.1
} else {
log::debug!("assignments of a `Schedule` should never be empty.");
PartsOf57600(1)
};
let assignments = assignments
.into_iter()
.map(|(a, ratio)| (a, AssignmentState { ratio, remaining: ratio }))
.collect();
Self { assignments, end_hint, pos: 0, step }
}
}
#[pezframe_support::pezpallet]
pub mod pezpallet {
use super::*;
#[pezpallet::pezpallet]
#[pezpallet::without_storage_info]
pub struct Pezpallet<T>(_);
#[pezpallet::config]
pub trait Config: pezframe_system::Config + configuration::Config + on_demand::Config {}
#[pezpallet::storage]
pub(super) type CoreSchedules<T: Config> = StorageMap<
_,
Twox256,
(BlockNumberFor<T>, CoreIndex),
Schedule<BlockNumberFor<T>>,
OptionQuery,
>;
#[pezpallet::storage]
pub(super) type CoreDescriptors<T: Config> = StorageMap<
_,
Twox256,
CoreIndex,
CoreDescriptor<BlockNumberFor<T>>,
ValueQuery,
GetDefault,
>;
#[pezpallet::hooks]
impl<T: Config> Hooks<BlockNumberFor<T>> for Pezpallet<T> {}
#[pezpallet::error]
pub enum Error<T> {
AssignmentsEmpty,
DisallowedInsert,
}
}
impl<T: Config> AssignmentProvider<BlockNumberFor<T>> for Pezpallet<T> {
fn pop_assignment_for_core(core_idx: CoreIndex) -> Option<Assignment> {
let now = pezframe_system::Pezpallet::<T>::block_number();
CoreDescriptors::<T>::mutate(core_idx, |core_state| {
Self::ensure_workload(now, core_idx, core_state);
let work_state = core_state.current_work.as_mut()?;
work_state.pos = work_state.pos % work_state.assignments.len() as u16;
let (a_type, a_state) = &mut work_state
.assignments
.get_mut(work_state.pos as usize)
.expect("We limited pos to the size of the vec one line above. qed");
a_state.remaining = a_state.remaining.saturating_sub(work_state.step);
if a_state.remaining < work_state.step {
work_state.pos += 1;
a_state.remaining = a_state.remaining.saturating_add(a_state.ratio);
}
match a_type {
CoreAssignment::Idle => None,
CoreAssignment::Pool => {
on_demand::Pezpallet::<T>::pop_assignment_for_core(core_idx)
},
CoreAssignment::Task(para_id) => Some(Assignment::Bulk((*para_id).into())),
}
})
}
fn report_processed(assignment: Assignment) {
match assignment {
Assignment::Pool { para_id, core_index } => {
on_demand::Pezpallet::<T>::report_processed(para_id, core_index)
},
Assignment::Bulk(_) => {},
}
}
fn push_back_assignment(assignment: Assignment) {
match assignment {
Assignment::Pool { para_id, core_index } => {
on_demand::Pezpallet::<T>::push_back_assignment(para_id, core_index)
},
Assignment::Bulk(_) => {
},
}
}
#[cfg(any(feature = "runtime-benchmarks", test))]
fn get_mock_assignment(_: CoreIndex, para_id: pezkuwi_primitives::Id) -> Assignment {
Assignment::Bulk(para_id)
}
fn assignment_duplicated(assignment: &Assignment) {
match assignment {
Assignment::Pool { para_id, core_index } => {
on_demand::Pezpallet::<T>::assignment_duplicated(*para_id, *core_index)
},
Assignment::Bulk(_) => {},
}
}
}
impl<T: Config> Pezpallet<T> {
fn ensure_workload(
now: BlockNumberFor<T>,
core_idx: CoreIndex,
descriptor: &mut CoreDescriptor<BlockNumberFor<T>>,
) {
if descriptor
.current_work
.as_ref()
.and_then(|w| w.end_hint)
.map_or(false, |e| e <= now)
{
descriptor.current_work = None;
}
let Some(queue) = descriptor.queue else {
return;
};
let mut next_scheduled = queue.first;
if next_scheduled > now {
return;
}
let update = loop {
let Some(update) = CoreSchedules::<T>::take((next_scheduled, core_idx)) else {
break None;
};
if update.end_hint.map_or(true, |e| e > now) {
break Some(update);
}
if let Some(n) = update.next_schedule {
next_scheduled = n;
} else {
break None;
}
};
let new_first = update.as_ref().and_then(|u| u.next_schedule);
descriptor.current_work = update.map(Into::into);
descriptor.queue = new_first.map(|new_first| {
QueueDescriptor {
first: new_first,
last: queue.last,
}
});
}
pub fn assign_core(
core_idx: CoreIndex,
begin: BlockNumberFor<T>,
mut assignments: Vec<(CoreAssignment, PartsOf57600)>,
end_hint: Option<BlockNumberFor<T>>,
) -> Result<(), DispatchError> {
ensure!(!assignments.is_empty(), Error::<T>::AssignmentsEmpty);
CoreDescriptors::<T>::mutate(core_idx, |core_descriptor| {
let new_queue = match core_descriptor.queue {
Some(queue) => {
ensure!(begin >= queue.last, Error::<T>::DisallowedInsert);
if begin > queue.last {
CoreSchedules::<T>::mutate((queue.last, core_idx), |schedule| {
if let Some(schedule) = schedule.as_mut() {
debug_assert!(schedule.next_schedule.is_none(), "queue.end was supposed to be the end, so the next item must be `None`!");
schedule.next_schedule = Some(begin);
} else {
defensive!("Queue end entry does not exist?");
}
});
}
CoreSchedules::<T>::mutate((begin, core_idx), |schedule| {
let assignments = if let Some(mut old_schedule) = schedule.take() {
old_schedule.assignments.append(&mut assignments);
old_schedule.assignments
} else {
assignments
};
*schedule = Some(Schedule { assignments, end_hint, next_schedule: None });
});
QueueDescriptor { first: queue.first, last: begin }
},
None => {
CoreSchedules::<T>::insert(
(begin, core_idx),
Schedule { assignments, end_hint, next_schedule: None },
);
QueueDescriptor { first: begin, last: begin }
},
};
core_descriptor.queue = Some(new_queue);
Ok(())
})
}
}
impl<T: Config> AssignCoretime for Pezpallet<T> {
fn assign_coretime(id: ParaId) -> DispatchResult {
let current_block = pezframe_system::Pezpallet::<T>::block_number();
let mut config = configuration::ActiveConfig::<T>::get();
let core = config.scheduler_params.num_cores;
config.scheduler_params.num_cores.saturating_inc();
configuration::Pezpallet::<T>::force_set_active_config(config);
let begin = current_block + One::one();
let assignment =
vec![(pezpallet_broker::CoreAssignment::Task(id.into()), PartsOf57600::FULL)];
Pezpallet::<T>::assign_core(CoreIndex(core), begin, assignment, None)
}
}