use crossbeam::channel::{bounded, Receiver, Sender};
use std::collections::VecDeque;
use std::fmt::Debug;
use std::panic::AssertUnwindSafe;
use zrx_executor::strategy::WorkSharing;
use zrx_executor::{self as executor, Error, Executor, Strategy};
use crate::scheduler::engine::{AsReceiver, TokenFull};
use crate::scheduler::signal::Id;
use crate::scheduler::step::effect::Task;
use crate::scheduler::step::{Result, Steps};
pub struct Tasks<I, S>
where
S: Strategy,
{
executor: Executor<S>,
queue: VecDeque<Box<dyn executor::Task>>,
sender: Sender<Item<I>>,
receiver: Receiver<Item<I>>,
}
impl<I, S> Tasks<I, S>
where
S: Strategy,
{
#[must_use]
pub fn new(executor: Executor<S>) -> Self {
let (sender, receiver) = bounded(executor.capacity());
Self {
executor,
queue: VecDeque::new(),
sender,
receiver,
}
}
pub fn submit(&mut self, token: TokenFull, task: Task<I>)
where
I: Id,
{
match self.executor.submit({
let sender = self.sender.clone();
AssertUnwindSafe(move || {
let _ = sender.send((token, task.execute()));
})
}) {
Ok(()) => (),
Err(Error::Submit(task)) => {
self.queue.push_back(task);
}
Err(Error::Signal) => panic!("unrecoverable error"),
}
}
pub fn update(&mut self) -> Option<usize> {
(!self.queue.is_empty()).then(|| {
let mut count = 0;
while let Some(task) = self.queue.pop_front() {
if let Err(Error::Submit(task)) = self.executor.submit(task) {
self.queue.push_front(task);
break;
}
count += 1;
}
count
})
}
#[inline]
#[must_use]
pub fn take(&self) -> Option<Item<I>> {
self.receiver.try_recv().ok()
}
}
#[allow(clippy::must_use_candidate)]
impl<I, S> Tasks<I, S>
where
S: Strategy,
{
#[inline]
pub fn len(&self) -> usize {
self.executor.len() + self.receiver.len() + self.queue.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.executor.is_empty()
&& self.receiver.is_empty()
&& self.queue.is_empty()
}
#[inline]
pub fn is_ready(&self) -> bool {
!self.receiver.is_empty()
}
}
impl<I, S> AsReceiver<I> for Tasks<I, S>
where
S: Strategy,
{
type Item = Item<I>;
#[inline]
fn as_receiver(&self) -> &Receiver<Self::Item> {
&self.receiver
}
}
impl<I> Default for Tasks<I, WorkSharing> {
#[inline]
fn default() -> Self {
Self::new(Executor::default())
}
}
impl<I, S> Debug for Tasks<I, S>
where
S: Strategy,
{
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Tasks")
.field("executor", &self.executor)
.field("queue", &self.queue.len())
.finish_non_exhaustive()
}
}
pub type Item<I> = (TokenFull, Result<Steps<I>>);