use super::CancelableWorker;
use crate::{
State,
internal::HeapBuffer,
worker_traits::{WorkerInit, WorkerMethods},
};
use std::num::NonZeroUsize;
pub struct OrderedCancelableWorker<T, R>
where
T: Send + 'static,
R: Send + 'static,
{
inner: CancelableWorker<(NonZeroUsize, T), (NonZeroUsize, R)>,
result_heap: HeapBuffer<R>,
task_indx: NonZeroUsize,
}
impl<T, R> WorkerMethods<T, R> for OrderedCancelableWorker<T, R>
where
T: Send + 'static,
R: Send + 'static,
{
fn add_task(&mut self, task: T) {
self.inner.add_task((self.task_indx, task));
self.task_indx = self.task_indx.saturating_add(1);
}
fn add_tasks(&mut self, tasks: impl IntoIterator<Item = T>) {
self.inner.add_tasks(tasks.into_iter().map(|t| {
let task = (self.task_indx, t);
self.task_indx = self.task_indx.saturating_add(1);
task
}));
}
fn cancel_tasks(&mut self) {
self.task_indx = self.result_heap.current_indx();
self.inner.cancel_tasks();
}
fn get(&mut self) -> Option<R> {
self.get_in_order(|inner| inner.get())
}
fn get_blocking(&mut self) -> Option<R> {
self.get_in_order(|inner| inner.get_blocking())
}
fn pending_tasks(&self) -> usize {
self.inner.pending_tasks()
}
}
impl<T, R> OrderedCancelableWorker<T, R>
where
T: Send + 'static,
R: Send + 'static,
{
fn get_in_order(
&mut self,
get_function: impl Fn(
&mut CancelableWorker<(NonZeroUsize, T), (NonZeroUsize, R)>,
) -> Option<(NonZeroUsize, R)>,
) -> Option<R> {
if let Some(result) = self.result_heap.get() {
return Some(result);
}
while let Some((indx, result)) = get_function(&mut self.inner) {
if let Some(result) = self.result_heap.store_or_return(result, indx) {
return Some(result);
}
}
None
}
}
impl<T, R, F> WorkerInit<T, R, F> for OrderedCancelableWorker<T, R>
where
T: Send + 'static,
R: Send + 'static,
F: Fn(T, &State) -> Option<R> + Send + Copy + 'static,
{
fn with_num_threads(num_worker_threads: usize, worker_function: F) -> Self {
let inner =
CancelableWorker::with_num_threads(num_worker_threads, move |(indx, task), state| {
worker_function(task, state).map(|result| (indx, result))
});
Self {
inner,
result_heap: HeapBuffer::new(),
task_indx: NonZeroUsize::MIN,
}
}
}