use super::*;
use std::future::Future;
pub type JoinHandle<R> = async_task::Task<R>;
pub trait FuturesExecutor: Executor + Sync + 'static {
fn spawn<R: Send + 'static>(
&self,
future: impl Future<Output = R> + 'static + Send,
) -> JoinHandle<R>;
}
#[cfg(test)]
mod tests {
use super::*;
use futures::channel::oneshot::*;
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread,
time::Duration,
};
async fn just_succeed(barrier: Arc<AtomicBool>) -> () {
let res = barrier.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
assert!(res.is_ok());
}
#[test]
fn test_async_ready_ccp() {
let exec = crate::crossbeam_channel_pool::ThreadPool::new(2);
test_async_ready_executor(&exec);
exec.shutdown().expect("shutdown");
}
#[test]
fn test_async_ready_cwp() {
let exec = crate::crossbeam_workstealing_pool::small_pool(2);
test_async_ready_executor(&exec);
exec.shutdown().expect("shutdown");
}
fn test_async_ready_executor<E>(exec: &E)
where
E: FuturesExecutor,
{
let barrier = Arc::new(AtomicBool::new(false));
let f = just_succeed(barrier.clone());
exec.spawn(f).detach();
let mut done = false;
while !done {
thread::sleep(Duration::from_millis(100));
done = barrier.load(Ordering::SeqCst);
}
}
async fn wait_for_channel(receiver: Receiver<()>, barrier: Arc<AtomicBool>) -> () {
let _ok = receiver.await.expect("message");
let res = barrier.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
assert!(res.is_ok());
}
#[test]
fn test_async_pending_ccp() {
let exec = crate::crossbeam_channel_pool::ThreadPool::new(2);
test_async_pending_executor(&exec);
exec.shutdown().expect("shutdown");
}
#[test]
fn test_async_pending_cwp() {
let exec = crate::crossbeam_workstealing_pool::small_pool(2);
test_async_pending_executor(&exec);
exec.shutdown().expect("shutdown");
}
fn test_async_pending_executor<E>(exec: &E)
where
E: FuturesExecutor,
{
let barrier = Arc::new(AtomicBool::new(false));
let (tx, rx) = channel();
let f = wait_for_channel(rx, barrier.clone());
exec.spawn(f).detach();
thread::sleep(Duration::from_millis(100));
tx.send(()).expect("sent");
let mut done = false;
while !done {
thread::sleep(Duration::from_millis(100));
done = barrier.load(Ordering::SeqCst);
}
}
#[test]
fn test_async_result_ccp() {
let exec = crate::crossbeam_channel_pool::ThreadPool::new(2);
test_async_result_executor(&exec);
exec.shutdown().expect("shutdown");
}
#[test]
fn test_async_result_cwp() {
let exec = crate::crossbeam_workstealing_pool::small_pool(2);
test_async_result_executor(&exec);
exec.shutdown().expect("shutdown");
}
fn test_async_result_executor<E>(exec: &E)
where
E: FuturesExecutor,
{
let test_string = "test".to_string();
let (tx, rx) = channel::<String>();
let handle = exec.spawn(async move { rx.await.expect("message") });
thread::sleep(Duration::from_millis(100));
tx.send(test_string.clone()).expect("sent");
let res = futures::executor::block_on(handle);
assert_eq!(res, test_string)
}
}