use crossbeam::channel::{at, never};
use crossbeam::select;
use std::error::Error;
use std::marker::PhantomData;
use std::time::Instant;
use zrx_diagnostic::report::Report;
use zrx_executor::Strategy;
use super::action::Outputs;
use super::executor::ToReceiver;
use super::{Id, Scheduler};
#[derive(Debug)]
pub struct Tick<I, S> {
report: Report,
deadline: Option<Instant>,
marker: PhantomData<(I, S)>,
}
impl<I, S> Tick<I, S>
where
I: Id,
S: Strategy,
{
pub fn new(deadline: Option<Instant>) -> Self {
Self {
report: Report::new(()),
deadline,
marker: PhantomData,
}
}
#[inline]
pub fn run(mut self, scheduler: &mut Scheduler<I, S>) -> Report {
self.process(scheduler);
self.report
}
fn process(&mut self, scheduler: &mut Scheduler<I, S>) {
self.process_tasks(scheduler);
self.process_timers(scheduler);
if scheduler.executor.can_make_progress() {
self.running(scheduler);
} else {
self.waiting(scheduler);
}
}
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn process_tasks(&mut self, scheduler: &mut Scheduler<I, S>) {
while let Some((token, res)) = scheduler.tasks.take() {
match res {
Err(err) => {
handle_error(&err);
scheduler.handle(token, Outputs::default());
}
Ok(target) => {
scheduler.handle(token, self.report.merge(target));
}
}
}
scheduler.tasks.update();
}
#[allow(clippy::unused_self)]
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all))]
fn process_timers(&mut self, scheduler: &mut Scheduler<I, S>) {
while let Some((token, outputs)) = scheduler.timers.take() {
scheduler.handle(token, outputs);
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "debug", skip_all)
)]
fn running(&mut self, scheduler: &mut Scheduler<I, S>) {
let mut max = 0;
while let Some(message) = scheduler.connector.take() {
scheduler.handle_message(message);
scheduler.total += 1;
max += 1;
if max >= 16 {
break;
}
}
let mut max = 0;
for (token, res) in scheduler.executor.take() {
match res {
Err(err) => {
handle_error(&err);
scheduler.handle(token, Outputs::default());
}
Ok(target) => {
scheduler.handle(token, self.report.merge(target));
}
}
max += 1;
if max >= 16 {
break;
}
}
}
#[cfg_attr(
feature = "tracing",
tracing::instrument(level = "debug", skip_all)
)]
fn waiting(&mut self, scheduler: &mut Scheduler<I, S>) {
let deadline = self.deadline(scheduler);
select! {
recv(deadline.map_or_else(never, at)) -> _ => {}
recv(scheduler.connector.to_receiver()) -> res => {
let message = res.expect("invariant");
scheduler.handle_message(message);
}
recv(scheduler.tasks.to_receiver()) -> res => {
let (token, res) = res.expect("invariant");
match res {
Err(err) => {
handle_error(&err);
scheduler.handle(token, Outputs::default());
}
Ok(target) => {
scheduler.handle(token, self.report.merge(target));
}
}
}
recv(scheduler.timers.to_receiver()) -> _ => {
self.process_timers(scheduler);
}
};
}
fn deadline(&mut self, scheduler: &mut Scheduler<I, S>) -> Option<Instant> {
self.deadline.or((scheduler.connector.is_empty()
&& scheduler.tasks.is_empty()
&& scheduler.timers.is_empty())
.then(Instant::now))
}
}
fn handle_error(err: &dyn Error) {
let mut current = Some(err as &dyn std::error::Error);
let mut indent = 0;
while let Some(error) = current {
let prefix = if indent == 0 {
"Error: "
} else {
"Caused by: "
};
println!("{:indent$}{}{}", "", prefix, error, indent = indent);
current = error.source();
indent += 2;
}
}