use std::future::Future;
use std::mem::MaybeUninit;
use std::pin::Pin;
use heph::actor::NewActor;
use heph::supervisor::Supervisor;
use heph_inbox::Manager;
use log::{debug, trace};
use crate::process::{self, ActorProcess, FutureProcess, Process, ProcessId};
use crate::spawn::options::Priority;
use crate::{ptr_as_usize, ThreadSafe};
mod inactive;
mod runqueue;
#[cfg(test)]
mod tests;
use inactive::Inactive;
use runqueue::RunQueue;
pub(super) type ProcessData = process::ProcessData<dyn Process + Send + Sync>;
#[derive(Debug)]
pub(super) struct Scheduler {
ready: RunQueue,
inactive: Inactive,
}
impl Scheduler {
pub(super) fn new() -> Scheduler {
Scheduler {
ready: RunQueue::empty(),
inactive: Inactive::empty(),
}
}
pub(crate) fn ready(&self) -> usize {
self.ready.len()
}
pub(crate) fn inactive(&self) -> usize {
self.inactive.len()
}
pub(super) fn has_process(&self) -> bool {
let has_inactive = self.inactive.has_process();
has_inactive || self.has_ready_process()
}
pub(super) fn has_ready_process(&self) -> bool {
self.ready.has_process()
}
pub(super) fn add_actor<'s>(&'s self) -> AddActor<'s> {
AddActor {
scheduler: self,
alloc: Box::new_uninit(),
}
}
pub(super) fn add_future<Fut>(&self, future: Fut, priority: Priority)
where
Fut: Future<Output = ()> + Send + Sync + 'static,
{
let process = Box::pin(ProcessData::new(
priority,
Box::pin(FutureProcess::<Fut, ThreadSafe>::new(future)),
));
debug!(pid = process.as_ref().id().0; "spawning thread-safe future");
self.ready.add(process)
}
pub(super) fn mark_ready(&self, pid: ProcessId) {
trace!(pid = pid.0; "marking process as ready");
self.inactive.mark_ready(pid, &self.ready);
}
pub(super) fn remove(&self) -> Option<Pin<Box<ProcessData>>> {
self.ready.remove()
}
pub(super) fn add_process(&self, process: Pin<Box<ProcessData>>) {
let pid = process.as_ref().id();
trace!(pid = pid.0; "adding back process");
self.inactive.add(process, &self.ready);
}
#[allow(clippy::unused_self)] pub(super) fn complete(&self, process: Pin<Box<ProcessData>>) {
let pid = process.as_ref().id();
trace!(pid = pid.0; "removing process");
self.inactive.complete(process);
}
}
pub(super) struct AddActor<'s> {
scheduler: &'s Scheduler,
alloc: Box<MaybeUninit<ProcessData>>,
}
impl<'s> AddActor<'s> {
pub(super) const fn pid(&self) -> ProcessId {
#[allow(clippy::borrow_as_ptr)]
ProcessId(ptr_as_usize(&*self.alloc as *const _))
}
pub(super) fn add<S, NA>(
self,
priority: Priority,
supervisor: S,
new_actor: NA,
actor: NA::Actor,
inbox: Manager<NA::Message>,
is_ready: bool,
) where
S: Supervisor<NA> + Send + Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + Sync + 'static,
NA::Actor: Send + Sync + 'static,
NA::Message: Send,
{
debug_assert!(
inactive::ok_ptr(self.alloc.as_ptr() as *const ()),
"SKIP_BITS invalid"
);
let process = ProcessData::new(
priority,
Box::pin(ActorProcess::new(supervisor, new_actor, actor, inbox)),
);
let AddActor {
scheduler,
mut alloc,
} = self;
let process: Pin<_> = unsafe {
let _ = alloc.write(process);
alloc.assume_init().into()
};
if is_ready {
scheduler.ready.add(process);
} else {
scheduler.add_process(process)
}
}
}