use crossbeam::channel::Select;
use std::fmt::Debug;
use std::time::Instant;
use zrx_executor::strategy::WorkSharing;
use zrx_executor::{Executor, Strategy};
pub mod action;
mod engine;
pub mod router;
pub mod schedule;
pub mod session;
pub mod signal;
pub mod step;
use engine::queue::{Tasks, Timers};
use engine::{Actions, AsReceiver, Token, TokenFull};
use router::Router;
use schedule::Schedule;
use session::Session;
use signal::{Id, Value};
use step::effect::timer::{IntoDuration, IntoInstant};
use step::effect::Effect;
use step::{Step, Steps};
#[derive(Debug)]
pub struct Scheduler<I, S = WorkSharing>
where
S: Strategy,
{
router: Router<I>,
actions: Actions<I>,
timers: Timers<I>,
tasks: Tasks<I, S>,
}
impl<I, S> Scheduler<I, S>
where
I: Id,
S: Strategy,
{
#[must_use]
pub fn new(executor: Executor<S>) -> Self {
Self {
router: Router::default(),
actions: Actions::new(),
timers: Timers::new(),
tasks: Tasks::new(executor),
}
}
#[inline]
pub fn attach<W>(&mut self, schedule: W) -> usize
where
W: Into<Schedule<I>>,
{
let s = self.actions.attach(schedule.into());
let schedule = &mut self.actions[s];
let graph = &mut schedule.graph;
for n in graph.sources().collect::<Vec<_>>() {
if let Some(source) = graph[n].as_source_mut() {
self.router
.add(Token { module: s, node: n }, source.sender());
}
}
s
}
#[inline]
pub fn detach(&mut self, index: usize) -> Option<Schedule<I>> {
self.actions.detach(index)
}
#[inline]
#[must_use]
pub fn session<T>(&mut self) -> Session<I, T>
where
T: Value,
{
self.router.session()
}
#[inline]
pub fn tick(&mut self) {
self.process(None);
}
#[inline]
pub fn tick_deadline<T>(&mut self, deadline: T)
where
T: IntoInstant,
{
self.process(Some(deadline.into_instant()));
}
#[inline]
pub fn tick_timeout<T>(&mut self, timeout: T)
where
T: IntoDuration,
{
self.process(Some(timeout.into_duration().into_instant()));
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", skip_all)
)]
fn process(&mut self, deadline: Option<Instant>) {
self.process_timers();
self.process_tasks();
if self.actions.is_empty() {
self.waiting(deadline.or(self.is_empty().then(Instant::now)));
} else {
self.running();
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "debug", skip_all)
)]
fn process_timers(&mut self) {
while let Some((token, steps)) = self.timers.take() {
self.handle(
Token {
module: token.module,
node: token.node,
},
steps,
);
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "debug", skip_all)
)]
fn process_tasks(&mut self) {
while let Some((token, res)) = self.tasks.take() {
self.handle(
Token {
module: token.module,
node: token.node,
},
res.unwrap(),
);
}
self.tasks.update();
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "debug", skip_all)
)]
fn waiting(&mut self, deadline: Option<Instant>) {
let mut select = Select::new();
assert_eq!(0, select.recv(self.timers.as_receiver()));
assert_eq!(1, select.recv(self.tasks.as_receiver()));
self.router.add_to_select(&mut select);
let n = if let Some(deadline) = deadline {
match select.ready_deadline(deadline) {
Ok(ready) => ready,
Err(_) => return,
}
} else {
select.ready()
};
match n {
0 => self.process_timers(),
1 => self.process_tasks(),
n => {
for token in self.router.poll(n - 2) {
self.actions.submit(token);
}
}
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "debug", skip_all)
)]
fn running(&mut self) {
while let Some((token, steps)) = self.actions.take() {
for res in steps {
self.handle(token, res.unwrap());
}
if self.tasks.is_ready() || self.timers.is_ready() {
break;
}
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "trace", skip_all)
)]
fn handle(&mut self, token: Token, steps: Steps<I>) {
for step in steps {
let Step { scoped, effect } = step;
let scoped = self.actions.ensure(token, &scoped);
match effect {
Effect::Then(then) => {
let (token, inner) = self.actions.resume(token, then);
for res in inner {
self.handle(token, res.unwrap());
}
}
Effect::Timer(timer) => {
self.timers.submit(
TokenFull {
module: token.module,
node: token.node,
frontier: scoped.id().expect("invariant"),
},
timer,
);
}
Effect::Task(task) => {
self.tasks.submit(
TokenFull {
module: token.module,
node: token.node,
frontier: scoped.id().expect("invariant"),
},
task,
);
}
Effect::Done => {
self.actions.complete(token, &scoped);
}
}
}
}
}
#[allow(clippy::must_use_candidate)]
impl<I, S> Scheduler<I, S>
where
I: Id,
S: Strategy,
{
#[inline]
pub fn len(&self) -> usize {
self.router.len()
+ self.actions.len()
+ self.timers.len()
+ self.tasks.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.router.is_empty()
&& self.actions.is_empty()
&& self.timers.is_empty()
&& self.tasks.is_empty()
}
}
impl<I, S> Default for Scheduler<I, S>
where
I: Id,
S: Strategy + Default,
{
#[inline]
fn default() -> Self {
Self::new(Executor::new(S::default()))
}
}