#![deny(warnings)]
#![warn(missing_docs)]
#![warn(clippy::missing_docs_in_private_items)]
use arc_swap::*;
use fxhash::*;
use priority_queue::priority_queue::*;
use private::*;
use std::cell::*;
use std::collections::*;
use std::future::*;
use std::hash::*;
use std::marker::*;
use std::mem::*;
use std::ops::*;
use std::sync::atomic::*;
use std::sync::*;
use std::task::*;
use takecell::*;
mod sync_impl {
#[cfg(target_arch = "wasm32")]
pub use wasm_sync::*;
#[cfg(not(target_arch = "wasm32"))]
pub use std::sync::*;
}
pub trait WorkProvider: 'static + Send + Sync {
fn change_notifier(&self) -> &ChangeNotifier;
fn next_task(&self) -> Option<Box<dyn '_ + WorkUnit>>;
}
#[derive(Default)]
pub struct ChainedWorkProvider {
notifier: Arc<ChangeNotifier>,
providers: Vec<ChainedWorkProviderEntry>,
}
impl ChainedWorkProvider {
pub fn with(mut self, provider: impl WorkProvider) -> Self {
let notifier_cloned = self.notifier.clone();
let listener = provider
.change_notifier()
.add_listener(move || notifier_cloned.notify());
self.providers.push(ChainedWorkProviderEntry {
listener,
provider: Box::new(provider),
});
self
}
}
impl WorkProvider for ChainedWorkProvider {
fn change_notifier(&self) -> &ChangeNotifier {
&self.notifier
}
fn next_task(&self) -> Option<Box<dyn '_ + WorkUnit>> {
for entry in &self.providers {
if let Some(task) = entry.provider.next_task() {
return Some(task);
}
}
None
}
}
impl std::fmt::Debug for ChainedWorkProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChainedWorkProvider").finish()
}
}
#[allow(dead_code)]
struct ChainedWorkProviderEntry {
pub listener: ChangeNotificationListener,
pub provider: Box<dyn WorkProvider>,
}
pub trait WorkUnit {
fn execute(self: Box<Self>);
}
impl<F: FnOnce()> WorkUnit for F {
fn execute(self: Box<Self>) {
self();
}
}
#[derive(Default)]
pub struct ChangeNotifier {
listeners: sync_impl::RwLock<Vec<Weak<dyn Fn() + Send + Sync>>>,
}
impl ChangeNotifier {
pub fn notify(&self) {
for listener in &*self.listeners.read().expect("Could not acquire read lock.") {
if let Some(to_execute) = listener.upgrade() {
to_execute();
}
}
}
pub fn add_listener(
&self,
listener: impl 'static + Fn() + Send + Sync,
) -> ChangeNotificationListener {
let mut listeners = self
.listeners
.write()
.expect("Could not acquire write lock.");
Self::clear_dead_listeners(&mut listeners);
let result = Arc::new(listener) as Arc<dyn Fn() + Send + Sync>;
listeners.push(Arc::downgrade(&result));
ChangeNotificationListener(result)
}
fn clear_dead_listeners(listeners: &mut Vec<Weak<dyn Fn() + Send + Sync>>) {
unsafe {
let mut finish = 0;
let len = listeners.len();
listeners.set_len(0);
let view = listeners.spare_capacity_mut();
for i in 0..len {
let item = view.get_unchecked_mut(i);
if item.assume_init_ref().strong_count() == 0 {
item.assume_init_drop();
} else {
view.swap(i, finish);
finish += 1;
}
}
listeners.set_len(finish);
}
}
}
impl std::fmt::Debug for ChangeNotifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ChangeNotifier").finish()
}
}
#[allow(dead_code)]
pub struct ChangeNotificationListener(Arc<dyn Fn() + Send + Sync>);
impl std::fmt::Debug for ChangeNotificationListener {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("ChangeNotificationListener").finish()
}
}
#[allow(dead_code)]
pub struct TaskPool {
change_listener_inner: Option<(ChangeNotificationListener, Arc<TaskPoolInner>)>,
}
impl TaskPool {
pub fn new(provider: impl WorkProvider, threads: usize) -> Self {
Self::with_spawner(provider, threads, |_, f| {
std::thread::spawn(f);
})
}
pub fn with_spawner(
provider: impl WorkProvider,
threads: usize,
mut spawner: impl FnMut(usize, Box<dyn 'static + FnOnce() + Send>),
) -> Self {
let inner = Arc::new(TaskPoolInner::new(provider));
for id in 0..threads {
let inner_clone = inner.clone();
spawner(id, Box::new(move || inner_clone.run()));
}
let inner_clone = inner.clone();
let change_listener = inner
.provider()
.change_notifier()
.add_listener(move || inner_clone.notify_changed());
Self {
change_listener_inner: Some((change_listener, inner)),
}
}
pub fn forget(mut self) {
unsafe {
forget(
replace(&mut self.change_listener_inner, None)
.unwrap_unchecked()
.0,
);
}
}
}
impl Drop for TaskPool {
fn drop(&mut self) {
if let Some((_, inner)) = &self.change_listener_inner {
inner.stop();
}
}
}
struct TaskPoolInner {
work_provider: Box<dyn WorkProvider>,
task_counter: AtomicI32,
on_change: sync_impl::Condvar,
lock: sync_impl::Mutex<()>,
}
impl TaskPoolInner {
pub fn new(provider: impl WorkProvider) -> Self {
Self {
work_provider: Box::new(provider),
task_counter: AtomicI32::new(1),
on_change: sync_impl::Condvar::default(),
lock: sync_impl::Mutex::default(),
}
}
pub fn provider(&self) -> &dyn WorkProvider {
&*self.work_provider
}
#[allow(unused_variables)]
pub fn notify_changed(&self) {
let guard = self.lock.lock().expect("Could not acquire mutex.");
let old_value = self.task_counter.load(Ordering::Acquire);
if old_value.is_negative() {
let mut new_value = -old_value + 1;
if new_value == i32::MAX - 1 {
new_value = 1;
}
self.task_counter.store(new_value, Ordering::Release);
self.on_change.notify_all();
} else {
let mut new_value = old_value + 1;
if new_value == i32::MAX - 1 {
new_value = 1;
}
self.task_counter.store(new_value, Ordering::Release);
}
}
pub fn run(&self) {
loop {
let task_value = self.task_counter.load(Ordering::Acquire);
match task_value.cmp(&0) {
std::cmp::Ordering::Less => self.wait_for_change::<false>(task_value),
std::cmp::Ordering::Equal => return,
std::cmp::Ordering::Greater => {
if let Some(unit) = self.work_provider.next_task() {
unit.execute();
} else {
self.wait_for_change::<true>(task_value);
}
}
}
}
}
#[allow(unused_variables)]
pub fn stop(&self) {
let guard = self.lock.lock().expect("Could not acquire mutex.");
self.task_counter.store(0, Ordering::Release);
self.on_change.notify_all();
}
fn wait_for_change<const FLIP_COUNTER: bool>(&self, task_value: i32) {
let guard = self.lock.lock().expect("Could not acquire mutex.");
let mut new_value = self.task_counter.load(Ordering::Acquire);
if FLIP_COUNTER && new_value == task_value {
new_value = -new_value;
self.task_counter.store(new_value, Ordering::Release);
}
if new_value.is_negative() {
drop(self.on_change.wait(guard));
}
}
}
pub trait TaskProvider: 'static + Send + Sync {
fn next_task(&self) -> Option<Box<dyn WorkUnit>>;
}
pub trait TaskCollection<T>: TaskProvider + Sized {
fn result(&self) -> T;
}
#[derive(Debug)]
pub struct Task<T, B: QueueBacking> {
control: ManuallyDrop<Arc<TaskControl>>,
result: fn(*const ()) -> T,
backing: ManuallyDrop<Arc<TaskQueueHolder<B>>>,
}
impl<T, B: QueueBacking> Task<T, B> {
fn new<C: TaskCollection<T>>(provider: C, backing: Arc<TaskQueueHolder<B>>) -> Self {
unsafe {
let control = ManuallyDrop::new(Arc::new(TaskControl::new(provider)));
let result = transmute(C::result as fn(&C) -> T);
Self {
control,
result,
backing: ManuallyDrop::new(backing),
}
}
}
fn control(&self) -> Arc<TaskControl> {
(*self.control).clone()
}
pub fn forget(mut self) {
unsafe {
ManuallyDrop::drop(&mut self.backing);
ManuallyDrop::drop(&mut self.control);
forget(self);
}
}
pub fn complete(&self) -> bool {
self.control.complete()
}
pub fn result(self) -> Result<T, Self> {
if self.complete() {
Ok(self.join())
} else {
Err(self)
}
}
pub fn join(self) -> T {
unsafe {
while let Some(work) = self.control.collection().next_task() {
work.execute();
}
self.control.cancel();
if self.complete() {
self.get_result()
} else {
let waker = CondvarWaker::default();
let guard = waker.lock.lock().unwrap_unchecked();
self.control.set_result_waker(waker.as_waker());
drop(waker.on_wake.wait_while(guard, |()| !self.complete()));
self.get_result()
}
}
}
pub fn join_work(&self) {
while let Some(work) = self.control.collection().next_task() {
work.execute();
}
}
unsafe fn get_result(&self) -> T {
(self.result)(transmute::<_, (*const (), *const ())>(self.control.collection()).0)
}
}
impl<T, P: Ord + Send> Task<T, Priority<P>> {
pub fn set_priority(&mut self, priority: P) {
unsafe {
self.backing
.inner
.lock()
.unwrap_unchecked()
.queued
.inner
.change_priority(&PriorityHolder(self.control()), priority);
}
}
}
impl<T, B: QueueBacking> Future for Task<T, B> {
type Output = T;
fn poll(self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
if self.complete() {
Poll::Ready(self.get_result())
} else {
self.control.set_result_waker(cx.waker().clone());
Poll::Pending
}
}
}
}
impl<T, B: QueueBacking> Drop for Task<T, B> {
fn drop(&mut self) {
unsafe {
self.control.cancel();
ManuallyDrop::drop(&mut self.backing);
ManuallyDrop::drop(&mut self.control);
}
}
}
#[derive(Clone, Default)]
struct CondvarWaker {
inner: Arc<CondvarWakerInner>,
}
impl CondvarWaker {
pub fn as_waker(&self) -> Waker {
unsafe {
Waker::from_raw(Self::clone_waker(
&self.inner as *const Arc<CondvarWakerInner> as *const (),
))
}
}
unsafe fn clone_waker(inner: *const ()) -> RawWaker {
unsafe {
let value = &*(inner as *const Arc<CondvarWakerInner>);
let data = Box::into_raw(Box::new(value.clone()));
RawWaker::new(
data as *const (),
&RawWakerVTable::new(
Self::clone_waker,
Self::wake_waker,
Self::wake_by_ref_waker,
Self::drop_waker,
),
)
}
}
unsafe fn wake_waker(inner: *const ()) {
Self::wake_by_ref_waker(inner);
Self::drop_waker(inner);
}
#[allow(unused_variables)]
unsafe fn wake_by_ref_waker(inner: *const ()) {
let inner = &*(inner as *const Arc<CondvarWakerInner>);
let guard = inner.lock.lock().expect("Could not lock mutex");
inner.on_wake.notify_all();
}
unsafe fn drop_waker(inner: *const ()) {
drop(Box::from_raw(inner as *mut Arc<CondvarWakerInner>));
}
}
impl Deref for CondvarWaker {
type Target = CondvarWakerInner;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
#[derive(Default)]
struct CondvarWakerInner {
lock: sync_impl::Mutex<()>,
on_wake: sync_impl::Condvar,
}
#[derive(Debug)]
pub struct Fifo {
inner: VecDeque<Arc<TaskControl>>,
}
impl QueueBacking for Fifo {
fn new() -> Self {
Self {
inner: VecDeque::new(),
}
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn next(&mut self) -> Option<Arc<TaskControl>> {
self.inner.pop_front()
}
}
impl PushPopQueueBacking for Fifo {
fn push(&mut self, task: Arc<TaskControl>) {
self.inner.push_back(task);
}
}
#[derive(Debug)]
pub struct Lifo {
inner: Vec<Arc<TaskControl>>,
}
impl QueueBacking for Lifo {
fn new() -> Self {
Self { inner: Vec::new() }
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn next(&mut self) -> Option<Arc<TaskControl>> {
self.inner.pop()
}
}
impl PushPopQueueBacking for Lifo {
fn push(&mut self, task: Arc<TaskControl>) {
self.inner.push(task);
}
}
#[derive(Debug)]
struct PriorityHolder(pub Arc<TaskControl>);
impl Deref for PriorityHolder {
type Target = Arc<TaskControl>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl PartialEq for PriorityHolder {
fn eq(&self, other: &Self) -> bool {
Arc::ptr_eq(&self.0, &other.0)
}
}
impl Eq for PriorityHolder {}
impl Hash for PriorityHolder {
fn hash<H: Hasher>(&self, state: &mut H) {
state.write_usize(Arc::as_ptr(&self.0) as usize)
}
}
#[derive(Debug)]
pub struct Priority<P: 'static + Ord + Send> {
inner: PriorityQueue<PriorityHolder, P, FxBuildHasher>,
}
impl<P: Ord + Send> QueueBacking for Priority<P> {
fn new() -> Self {
Self {
inner: PriorityQueue::default(),
}
}
fn is_empty(&self) -> bool {
self.inner.is_empty()
}
fn next(&mut self) -> Option<Arc<TaskControl>> {
self.inner.pop().map(|x| x.0 .0)
}
}
#[derive(Debug)]
pub struct TaskQueue<B: QueueBacking> {
inner: Arc<TaskQueueHolder<B>>,
}
impl<B: PushPopQueueBacking> TaskQueue<B> {
pub fn join<T>(&self, task: impl TaskCollection<T>) -> T {
let work = Task::new(task, self.inner.clone());
let next_job = work.control.collection().next_task();
self.push_control(work.control());
if let Some(job) = next_job {
job.execute();
}
work.join()
}
pub fn spawn<T>(&self, task: impl TaskCollection<T>) -> Task<T, B> {
let work = Task::new(task, self.inner.clone());
self.push_control(work.control());
work
}
fn push_control(&self, control: Arc<TaskControl>) {
unsafe {
let mut queue = self.inner.inner.lock().unwrap_unchecked();
if queue.queued.is_empty() {
self.inner.notifier.notify();
}
queue.queued.push(control);
}
}
}
impl<P: Ord + Send + Sync> TaskQueue<Priority<P>> {
pub fn join<T>(&self, priority: P, task: impl TaskCollection<T>) -> T {
let work = Task::new(task, self.inner.clone());
let next_job = work.control.collection().next_task();
self.push_control(priority, work.control());
if let Some(job) = next_job {
job.execute();
}
work.join()
}
pub fn spawn<T>(&self, priority: P, task: impl TaskCollection<T>) -> Task<T, Priority<P>> {
let work = Task::new(task, self.inner.clone());
self.push_control(priority, work.control());
work
}
fn push_control(&self, priority: P, control: Arc<TaskControl>) {
unsafe {
let mut queue = self.inner.inner.lock().unwrap_unchecked();
if queue.queued.is_empty() {
self.inner.notifier.notify();
}
queue.queued.inner.push(PriorityHolder(control), priority);
}
}
}
impl<B: QueueBacking> Default for TaskQueue<B> {
fn default() -> Self {
Self {
inner: Arc::new(TaskQueueHolder {
notifier: ChangeNotifier::default(),
inner: sync_impl::Mutex::new(TaskQueueInner {
current: None,
queued: B::new(),
}),
}),
}
}
}
impl<B: QueueBacking> Clone for TaskQueue<B> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<B: QueueBacking> WorkProvider for TaskQueue<B> {
fn change_notifier(&self) -> &ChangeNotifier {
&self.inner.notifier
}
fn next_task(&self) -> Option<Box<dyn WorkUnit>> {
unsafe {
let mut inner = self.inner.inner.lock().unwrap_unchecked();
loop {
if let Some(current) = &inner.current {
if current.increment_in_progress() {
if let Some(unit) = current.collection().next_task() {
let control = current.clone();
return Some(Box::new(move || {
unit.execute();
control.decrement_in_progress()
}));
} else {
current.cancel();
current.decrement_in_progress();
}
}
}
inner.current = inner.queued.next();
inner.current.as_ref()?;
}
}
}
}
#[derive(Debug)]
struct TaskQueueHolder<B: QueueBacking> {
notifier: ChangeNotifier,
inner: sync_impl::Mutex<TaskQueueInner<B>>,
}
#[derive(Debug)]
struct TaskQueueInner<B: QueueBacking> {
current: Option<Arc<TaskControl>>,
queued: B,
}
pub fn once<T: 'static + Send>(f: impl 'static + Send + FnOnce() -> T) -> impl TaskCollection<T> {
struct SyncWrapper<T: Send>(T);
unsafe impl<T: Send> Sync for SyncWrapper<T> {}
struct OnceTask<T: 'static + Send> {
f: TakeOwnCell<Box<dyn WorkUnit>>,
result: Arc<ArcSwapOption<SyncWrapper<T>>>,
}
impl<T: Send> TaskProvider for OnceTask<T> {
fn next_task(&self) -> Option<Box<dyn WorkUnit>> {
self.f.take()
}
}
impl<T: Send> TaskCollection<T> for OnceTask<T> {
fn result(&self) -> T {
unsafe {
Arc::into_inner(
self.result
.swap(None)
.expect("Task was not yet complete or already taken."),
)
.unwrap_unchecked()
.0
}
}
}
unsafe impl<T: Send> Send for OnceTask<T> {}
unsafe impl<T: Send> Sync for OnceTask<T> {}
let result = Arc::new(ArcSwapOption::const_empty());
let result_cloned = result.clone();
OnceTask {
f: TakeOwnCell::new(Box::new(move || {
result_cloned.store(Some(Arc::new(SyncWrapper(f()))));
})),
result,
}
}
pub fn many<T: 'static + Send>(
f: impl IntoIterator<Item = impl 'static + Send + FnOnce() -> T>,
) -> impl TaskCollection<Vec<T>> {
struct ManyTaskInner<T: 'static + Send, F: 'static + Send + FnOnce() -> T> {
next_task: AtomicU32,
remaining_tasks: AtomicU32,
tasks_to_complete: Vec<MaybeUninit<F>>,
results: UnsafeCell<Vec<UnsafeCell<MaybeUninit<T>>>>,
}
unsafe impl<T: 'static + Send, F: 'static + Send + FnOnce() -> T> Send for ManyTaskInner<T, F> {}
unsafe impl<T: 'static + Send, F: 'static + Send + FnOnce() -> T> Sync for ManyTaskInner<T, F> {}
struct ManyTask<T: 'static + Send, F: 'static + Send + FnOnce() -> T> {
inner: Arc<ManyTaskInner<T, F>>,
}
impl<T: 'static + Send, F: 'static + Send + FnOnce() -> T> TaskProvider for ManyTask<T, F> {
fn next_task(&self) -> Option<Box<dyn WorkUnit>> {
unsafe {
let idx = self.inner.next_task.fetch_add(1, Ordering::AcqRel) as usize;
if let Some(task) = self.inner.tasks_to_complete.get(idx) {
let inner = self.inner.clone();
let func = task.assume_init_read();
Some(Box::new(move || {
*(*inner.results.get()).get_unchecked(idx).get() = MaybeUninit::new(func());
inner.remaining_tasks.fetch_sub(1, Ordering::AcqRel);
}))
} else {
None
}
}
}
}
impl<T: 'static + Send, F: 'static + Send + FnOnce() -> T> TaskCollection<Vec<T>>
for ManyTask<T, F>
{
fn result(&self) -> Vec<T> {
unsafe {
assert!(
self.inner.remaining_tasks.swap(u32::MAX, Ordering::Acquire) == 0,
"Task collection was not complete."
);
transmute(self.inner.results.get().replace(Vec::new()))
}
}
}
let tasks_to_complete = f.into_iter().map(MaybeUninit::new).collect::<Vec<_>>();
let mut results = Vec::with_capacity(tasks_to_complete.len());
for _ in 0..tasks_to_complete.len() {
results.push(UnsafeCell::new(MaybeUninit::uninit()));
}
ManyTask {
inner: Arc::new(ManyTaskInner {
next_task: AtomicU32::new(0),
remaining_tasks: AtomicU32::new(tasks_to_complete.len() as u32),
tasks_to_complete,
results: UnsafeCell::new(results),
}),
}
}
mod private {
use super::*;
pub trait QueueBacking: 'static + Send {
fn new() -> Self;
fn is_empty(&self) -> bool;
fn next(&mut self) -> Option<Arc<TaskControl>>;
}
pub trait PushPopQueueBacking: QueueBacking {
fn push(&mut self, task: Arc<TaskControl>);
}
pub struct TaskControl {
collection: Box<dyn TaskProvider>,
pub in_progress: AtomicUsize,
result_waker: ArcSwapOption<Waker>,
}
impl TaskControl {
const CANCEL_FLAG: usize = 1 << (usize::BITS - 1);
pub fn new(provider: impl TaskProvider) -> Self {
let collection = Box::new(provider);
let in_progress = AtomicUsize::default();
let result_waker = ArcSwapOption::const_empty();
Self {
collection,
in_progress,
result_waker,
}
}
pub fn cancel(&self) {
self.in_progress
.fetch_or(Self::CANCEL_FLAG, Ordering::Release);
}
pub fn collection(&self) -> &dyn TaskProvider {
&*self.collection
}
pub fn complete(&self) -> bool {
self.in_progress.load(Ordering::Acquire) == Self::CANCEL_FLAG
}
pub fn increment_in_progress(&self) -> bool {
self.in_progress
.fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
(x < Self::CANCEL_FLAG).then_some(x + 1)
})
.is_ok()
}
pub fn decrement_in_progress(&self) {
if self.in_progress.fetch_sub(1, Ordering::AcqRel) == Self::CANCEL_FLAG + 1 {
if let Some(value) = &*self.result_waker.load() {
value.wake_by_ref();
}
}
}
pub fn set_result_waker(&self, waker: Waker) {
self.result_waker.store(Some(Arc::new(waker)));
}
}
impl std::fmt::Debug for TaskControl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TaskControl")
.field("in_progress", &self.in_progress)
.field("result_waker", &self.result_waker)
.finish()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures_executor::*;
async fn execute_background() {
let queue = TaskQueue::<Fifo>::default();
TaskPool::new(queue.clone(), 4).forget();
assert_eq!(
queue
.spawn(once(|| {
println!("This will execute on background thread.");
2
}))
.await,
2
);
}
#[test]
fn execute_many() {
let queue_a = TaskQueue::<Fifo>::default();
assert_eq!(
&[0, 3, 8, 15, 24][..],
&queue_a.join(many((1..=5).map(|x| move || x * x - 1)))
);
}
#[test]
fn execute_double() {
let queue_a = TaskQueue::<Fifo>::default();
let queue_b = TaskQueue::<Lifo>::default();
let first_task = queue_a.spawn(once(|| 2));
let second_task = queue_b.spawn(once(|| 2));
TaskPool::new(
ChainedWorkProvider::default()
.with(queue_a.clone())
.with(queue_b.clone()),
4,
)
.forget();
assert_eq!(first_task.join(), second_task.join());
}
#[test]
fn execute_double_twice() {
let queue_a = TaskQueue::<Fifo>::default();
let queue_b = TaskQueue::<Lifo>::default();
let first_task = queue_a.spawn(once(|| 2));
let second_task = queue_b.spawn(once(|| 2));
TaskPool::new(
ChainedWorkProvider::default()
.with(queue_a.clone())
.with(queue_b.clone()),
1,
)
.forget();
assert_eq!(first_task.join(), second_task.join());
for _i in 0..1000 {
let third_task =
queue_a.spawn(once(|| std::thread::sleep(std::time::Duration::new(0, 10))));
let fourth_task = queue_b.spawn(once(|| {
std::thread::sleep(std::time::Duration::new(0, 200))
}));
assert_eq!(third_task.join(), fourth_task.join());
}
}
#[test]
fn execute_background_blocking() {
block_on(execute_background());
}
}