use std::time::{Duration, Instant};
use zrx_diagnostic::report::Report;
use zrx_executor::strategy::WorkSharing;
use zrx_executor::{self, Strategy};
pub mod action;
pub mod effect;
mod executor;
pub mod graph;
pub mod id;
pub mod session;
mod tick;
pub mod value;
use action::{Output, Outputs};
use executor::queue::{Tasks, Timers};
use executor::{Executor, Token};
use graph::Graph;
use id::Id;
use session::{Connector, Message, Result, Session, Sessions};
use tick::Tick;
use value::Value;
#[derive(Debug)]
pub struct Scheduler<I, S>
where
I: Id,
S: Strategy,
{
executor: Executor<I>,
connector: Connector<I>,
sessions: Sessions,
tasks: Tasks<I, S>,
timers: Timers<I>,
total: usize,
}
impl<I> Scheduler<I, WorkSharing>
where
I: Id,
{
#[must_use]
pub fn new(meta: Graph<I>) -> Self {
Self::with_executor(meta, zrx_executor::Executor::default())
}
}
impl<I, S> Scheduler<I, S>
where
I: Id,
S: Strategy,
{
#[must_use]
pub fn with_executor(
meta: Graph<I>, executor: zrx_executor::Executor<S>,
) -> Self {
Self {
executor: Executor::new(meta.actions),
connector: Connector::new(),
sessions: Sessions::new(meta.sources),
tasks: Tasks::new(executor),
timers: Timers::new(),
total: 0,
}
}
#[inline]
pub fn session<T>(&mut self) -> Result<Session<I, T>>
where
T: Value,
{
let session = self.connector.session();
self.sessions.insert::<T>(session.id()).map(|()| session)
}
#[inline]
pub fn tick(&mut self) -> Report {
Tick::new(None).run(self)
}
#[inline]
pub fn tick_deadline(&mut self, deadline: Instant) -> Report {
Tick::new(Some(deadline)).run(self)
}
#[inline]
pub fn tick_timeout(&mut self, timeout: Duration) -> Report {
Tick::new(Some(Instant::now() + timeout)).run(self)
}
fn handle_message(&mut self, message: Message<I>) {
match message {
Message::Item(id, item) => {
self.executor.submit(item, self.sessions.get(id));
}
Message::Drop(id) => {
self.sessions.remove(id);
}
}
}
fn handle(&mut self, token: Token, outputs: Outputs<I>) {
let mut items = Vec::new();
let has_outputs = !outputs.is_empty();
for output in outputs {
match output {
Output::Item(item) => items.push(item),
Output::Task(task) => self.tasks.submit(token, task),
Output::Timer(timer) => self.timers.submit(token, timer),
}
}
if !has_outputs || !items.is_empty() {
self.executor.update(token, items);
}
}
}
#[allow(clippy::must_use_candidate)]
impl<I, S> Scheduler<I, S>
where
I: Id,
S: Strategy,
{
#[inline]
pub fn len(&self) -> usize {
self.executor.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.executor.is_empty() && self.connector.is_empty()
}
#[inline]
pub fn total(&self) -> usize {
self.total
}
}