use crossbeam::channel::{bounded, Receiver, Sender};
use std::borrow::Cow;
use std::collections::VecDeque;
use std::panic::AssertUnwindSafe;
use zrx_executor::strategy::WorkSharing;
use zrx_executor::{self as executor, Error, Executor, Strategy};
use crate::scheduler::action::{Outputs, Result};
use crate::scheduler::effect::Task;
use super::{ToReceiver, Token};
#[derive(Debug)]
pub struct Tasks<I, S>
where
S: Strategy,
{
executor: Executor<S>,
queue: VecDeque<Box<dyn executor::Task>>,
sender: Sender<Job<I>>,
receiver: Receiver<Job<I>>,
}
impl<I, S> Tasks<I, S>
where
S: Strategy,
{
#[must_use]
pub fn new(executor: Executor<S>) -> Self {
let capacity =
executor.capacity().unwrap_or(8 * executor.num_workers());
Self::with_capacity(executor, capacity)
}
#[must_use]
pub fn with_capacity(executor: Executor<S>, capacity: usize) -> Self {
let (sender, receiver) = bounded(capacity);
Self {
executor,
queue: VecDeque::new(),
sender,
receiver,
}
}
pub fn submit(&mut self, token: Token, task: Task<I>)
where
I: Send + 'static,
{
match self.executor.submit({
let sender = self.sender.clone();
AssertUnwindSafe(move || {
let _ = sender.send((token, task.execute()));
})
}) {
Ok(()) => (),
Err(Error::Submit(task)) => {
self.queue.push_back(task);
}
Err(Error::Signal) => panic!("invariant"),
}
}
pub fn update(&mut self) -> Option<usize> {
(!self.queue.is_empty()).then(|| {
let mut count = 0;
while let Some(task) = self.queue.pop_front() {
if let Err(Error::Submit(task)) = self.executor.submit(task) {
self.queue.push_front(task);
break;
}
count += 1;
}
count
})
}
#[inline]
pub fn take(&self) -> Option<Job<I>> {
self.receiver.try_recv().ok()
}
}
#[allow(clippy::must_use_candidate)]
impl<I, S> Tasks<I, S>
where
S: Strategy,
{
#[allow(clippy::needless_return)]
#[inline]
pub fn is_empty(&self) -> bool {
return self.executor.is_empty()
&& self.receiver.is_empty()
&& self.queue.is_empty();
}
}
impl<I, S> ToReceiver<I> for Tasks<I, S>
where
S: Strategy,
{
type Item = Job<I>;
#[inline]
fn to_receiver(&self) -> Cow<'_, Receiver<Self::Item>> {
Cow::Borrowed(&self.receiver)
}
}
impl<I> Default for Tasks<I, WorkSharing> {
#[inline]
fn default() -> Self {
Self::new(Executor::default())
}
}
pub type Job<I> = (Token, Result<Outputs<I>>);