#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
use std::{
future::Future,
pin::Pin,
sync::atomic::{AtomicUsize, Ordering},
task::{Context, Poll},
thread,
time::Duration
};
#[cfg(feature = "cleaver")]
use std::io;
use std::panic::UnwindSafe;
#[cfg(feature = "cleaver")]
use bytes::Bytes;
use futures_executor as futr_exec;
use futures_util::future::FutureExt;
use futures_util::task::SpawnExt;
#[cfg(any(feature = "cleaver", feature = "yield-stream"))]
use futures_util::{stream, stream::StreamExt};
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
use lazy_static::lazy_static;
#[cfg(feature="tokio-threaded")]
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
use futures_util::stream::FuturesUnordered;
use log::debug;
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
use log::info;
use crate::*;
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
lazy_static! {
static ref TEST_SET: Semaphore = Semaphore::default_new(1);
}
#[allow(dead_code)] fn is_send<T: Send>() -> bool { true }
#[allow(dead_code)] fn is_sync<T: Sync>() -> bool { true }
#[allow(dead_code)] fn is_unwind_safe<T: UnwindSafe>() -> bool { true }
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#[test]
fn test_blocking_permit_traits() {
assert!(is_send::<BlockingPermit<'_>>());
assert!(is_send::<BlockingPermitFuture<'_>>());
#[cfg(features = "tokio-semaphore")] {
assert!(is_sync::<BlockingPermitFuture<'_>>());
}
assert!(is_send::<SyncBlockingPermitFuture<'_>>());
assert!(is_sync::<SyncBlockingPermitFuture<'_>>());
}
fn log_init() {
piccolog::test_logger();
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
fn register_dispatch_pool() {
let pool = DispatchPool::builder()
.pool_size(2)
.after_start(|i| debug!("starting DispatchPool thread {}", i))
.before_stop(|i| debug!("dropping DispatchPool thread {}", i))
.create();
debug!("default pool: {:?}", pool);
crate::register_dispatch_pool(pool);
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
fn maybe_register_dispatch_pool() {
#[cfg(feature="current-thread")] {
register_dispatch_pool();
}
}
#[test]
fn dispatch_panic_returns_canceled() {
log_init();
let pool = DispatchPool::builder()
.pool_size(1)
.ignore_panics(true)
.after_start(|i| debug!("starting DispatchPool thread {}", i))
.before_stop(|i| debug!("dropping DispatchPool thread {}", i))
.create();
debug!("pool: {:?}", pool);
crate::register_dispatch_pool(pool);
let task = dispatch_rx(|| {
debug!("about to panic");
panic!("you asked for it");
}).unwrap();
let res = futr_exec::block_on(task);
match res {
Err(e @ Canceled) => debug!("return from panic dispatch: {}", e),
Ok(_) => panic!("should not succeed")
}
deregister_dispatch_pool();
}
fn spawn_good_and_bad_tasks() {
let mut lp = futr_exec::LocalPool::new();
let spawner = lp.spawner();
spawner.spawn(
dispatch_rx(|| {
debug!("about to panic");
panic!("you asked for it");
})
.unwrap()
.map(|r| {
if let Err(e) = r {
debug!("panicky dispatch_rx err: {}", e)
} else {
panic!("should not succeed");
}
})
).expect("spawn 1");
spawner.spawn(
dispatch_rx(|| {
debug!("anyone here to run me?");
})
.unwrap()
.map(|r| {
if let Err(e) = r {
panic!("normal dispatch_rx err: {}", e)
}
})
).expect("spawn 2");
lp.run();
}
#[test]
fn survives_dispatch_panic_catch_unwind() {
log_init();
let pool = DispatchPool::builder()
.pool_size(1)
.ignore_panics(true)
.after_start(|i| debug!("starting DispatchPool thread {}", i))
.before_stop(|i| debug!("dropping DispatchPool thread {}", i))
.create();
debug!("pool: {:?}", pool);
crate::register_dispatch_pool(pool);
spawn_good_and_bad_tasks();
deregister_dispatch_pool();
}
#[test]
fn survives_dispatch_panic_catch_unwind_on_caller() {
log_init();
let pool = DispatchPool::builder()
.queue_length(0)
.ignore_panics(true)
.after_start(|i| debug!("starting DispatchPool thread {}", i))
.before_stop(|i| debug!("dropping DispatchPool thread {}", i))
.create();
debug!("pool: {:?}", pool);
crate::register_dispatch_pool(pool);
spawn_good_and_bad_tasks();
deregister_dispatch_pool();
}
#[cfg(feature="impossible_abort_test")]
#[test]
fn aborts_dispatch_panic() {
log_init();
let pool = DispatchPool::builder()
.pool_size(1)
.queue_length(0)
.ignore_panics(false) .after_start(|i| debug!("starting DispatchPool thread {}", i))
.before_stop(|i| debug!("dropping DispatchPool thread {}", i))
.create();
debug!("pool: {:?}", pool);
crate::register_dispatch_pool(pool);
spawn_good_and_bad_tasks();
deregister_dispatch_pool();
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#[test]
fn runs_oldest_on_caller() {
log_init();
let pool = DispatchPool::builder()
.pool_size(1)
.queue_length(1)
.after_start(|i| debug!("starting DispatchPool thread {}", i))
.before_stop(|i| debug!("dropping DispatchPool thread {}", i))
.create();
debug!("pool: {:?}", pool);
crate::register_dispatch_pool(pool);
let mut lp = futr_exec::LocalPool::new();
let spawner = lp.spawner();
spawner.spawn(
dispatch_rx(|| {
assert!(
thread::current().name()
.unwrap()
.contains(&"dpx-")
);
thread::sleep(Duration::from_millis(100));
})
.unwrap()
.map(|_r| ())
).expect("spawn 1");
thread::sleep(Duration::from_millis(10));
spawner.spawn(
dispatch_rx(|| {
assert!(
thread::current().name()
.unwrap()
.contains(&"runs_oldest_on_caller")
);
thread::sleep(Duration::from_millis(100));
})
.unwrap()
.map(|_r| ())
).expect("spawn 2");
spawner.spawn(
dispatch_rx(|| {
assert!(
thread::current().name()
.unwrap()
.contains(&"dpx-")
);
})
.unwrap()
.map(|_r| ())
).expect("spawn 3");
lp.run();
deregister_dispatch_pool();
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
struct TestFuture<'a> {
delegate: Delegate<'a>,
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
enum Delegate<'a> {
Dispatch(Dispatched<usize>),
Permit(BlockingPermitFuture<'a>),
None
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
impl<'a> TestFuture<'a> {
fn new() -> Self {
TestFuture { delegate: Delegate::None }
}
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
impl<'a> Future for TestFuture<'a> {
type Output = Result<usize, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Self::Output>
{
let this = unsafe { self.get_unchecked_mut() };
match this.delegate {
Delegate::None => {
let s = "dispatched S".to_owned();
let drx = dispatch_rx(move || -> usize {
info!("do some blocking stuff ({})", s);
thread::sleep(Duration::from_millis(100));
42
});
match drx {
DispatchRx::Dispatch(disp) => {
this.delegate = Delegate::Dispatch(disp);
}
DispatchRx::NotRegistered(_) => {
let f = blocking_permit_future(&TEST_SET);
this.delegate = Delegate::Permit(f);
}
}
let s = unsafe { Pin::new_unchecked(this) };
s.poll(cx) }
Delegate::Dispatch(ref mut disp) => {
info!("delegate poll to Dispatched");
let disp = unsafe { Pin::new_unchecked(disp) };
disp.poll(cx)
}
Delegate::Permit(ref mut pf) => {
info!("delegate poll to BlockingPermitFuture");
let pf = unsafe { Pin::new_unchecked(pf) };
match pf.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Ok(p)) => {
p.enter();
info!("do some blocking stuff (permitted)");
Poll::Ready(Ok(41))
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e))
}
}
}
}
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#[test]
fn manual_future() {
log_init();
maybe_register_dispatch_pool();
let val = futr_exec::block_on(TestFuture::new()).expect("success");
assert!(val == 41 || val == 42);
deregister_dispatch_pool();
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#[test]
fn async_block_await_semaphore() {
log_init();
let task = async {
let mut _i = 0;
let drx = dispatch_rx(move || -> usize {
info!("do some blocking stuff (dispatched)");
_i = 1;
41
});
match drx {
DispatchRx::Dispatch(disp) => {
disp .await
}
DispatchRx::NotRegistered(_) => {
let permit = blocking_permit_future(&TEST_SET) .await?;
permit.enter();
info!("do some blocking stuff (permitted)");
Ok(42)
}
}
};
maybe_register_dispatch_pool();
let val = futr_exec::block_on(task).expect("task success");
assert!(val == 41 || val == 42);
deregister_dispatch_pool();
}
#[cfg(feature = "futures-intrusive")]
#[test]
fn async_block_with_macro_semaphore() {
log_init();
register_dispatch_pool();
let task = async {
dispatch_or_permit!(
&TEST_SET,
|| -> Result::<usize, Canceled> {
info!("do some blocking stuff, here or there");
Ok(41)
}
)
};
let val = futr_exec::block_on(task).expect("task success");
assert_eq!(val, 41);
deregister_dispatch_pool();
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#[test]
fn test_futr_local_pool() {
log_init();
register_dispatch_pool();
static FINISHED: AtomicUsize = AtomicUsize::new(0);
let mut pool = futr_exec::LocalPool::new();
let sp = pool.spawner();
for _ in 0..1000 {
sp.spawn(async {
dispatch_or_permit!(&TEST_SET, || -> Result<usize, Canceled> {
info!("do some blocking stuff, here or there");
Ok(41)
})
}.map(|r| {
assert_eq!(41, r.expect("dispatch_or_permit future"));
FINISHED.fetch_add(1, Ordering::SeqCst);
()
})).unwrap();
}
pool.run();
assert_eq!(1000, FINISHED.load(Ordering::SeqCst));
deregister_dispatch_pool();
}
#[cfg(any(feature = "tokio-semaphore", feature = "futures-intrusive"))]
#[cfg(feature="tokio-threaded")]
#[test]
fn test_tokio_threadpool() {
use futures_util::stream::StreamExt;
log_init();
lazy_static! {
static ref TEST_SET: Semaphore = Semaphore::default_new(3);
}
static FINISHED: AtomicUsize = AtomicUsize::new(0);
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(3)
.max_blocking_threads(1)
.build()
.unwrap();
let futures: FuturesUnordered<_> = (0..1000).map(|_| {
let j = async {
dispatch_or_permit!(&TEST_SET, || -> Result<usize, Canceled> {
info!("do some blocking stuff - {:?}",
thread::current().id());
thread::sleep(Duration::from_millis(1));
Ok(41)
})
}.map(|r| {
assert_eq!(41, r.expect("dispatch_or_permit future"));
FINISHED.fetch_add(1, Ordering::SeqCst);
()
});
rt.spawn(j) }).collect();
let join = rt.spawn(async {
let c = futures.collect::<Vec<_>>() .await;
assert_eq!(1000, c.iter().filter(|r| r.is_ok()).count());
});
rt.block_on(join).unwrap();
assert_eq!(1000, FINISHED.load(Ordering::SeqCst));
}
#[cfg(feature="cleaver")]
#[test]
fn test_cleaver_empty() {
let task = async {
let bstream = stream::empty();
let cleaver = super::Cleaver::new(bstream, 1);
cleaver.collect::<Vec<Result<Bytes,io::Error>>>() .await
};
let collected = futr_exec::block_on(task);
assert_eq!(collected.len(), 0);
}
#[cfg(feature="cleaver")]
#[test]
fn test_cleaver_empty_bytes() {
let task = async {
let bstream = stream::once(async { Ok(Bytes::new()) });
let cleaver = super::Cleaver::new(bstream, 1);
cleaver.collect::<Vec<Result<Bytes,io::Error>>>() .await
};
let collected = futr_exec::block_on(task);
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].as_ref().unwrap().len(), 0);
}
#[cfg(feature="cleaver")]
#[test]
fn test_cleaver_through() {
let task = async {
let bstream = stream::once(async { Ok(Bytes::from("foobar")) });
let cleaver = super::Cleaver::new(bstream, 9);
cleaver.collect::<Vec<Result<Bytes,io::Error>>>() .await
};
let collected = futr_exec::block_on(task);
assert_eq!(collected.len(), 1);
assert_eq!(collected[0].as_ref().unwrap().as_ref(), b"foobar");
}
#[cfg(feature="cleaver")]
#[test]
fn test_cleaver_more() {
let task = async {
let bstream = stream::once(async { Ok(Bytes::from("foobar")) });
let cleaver = super::Cleaver::new(bstream, 4);
cleaver.collect::<Vec<Result<Bytes,io::Error>>>() .await
};
let collected = futr_exec::block_on(task);
assert_eq!(collected.len(), 2);
assert_eq!(collected[0].as_ref().unwrap().as_ref(), b"foob");
assert_eq!(collected[1].as_ref().unwrap().as_ref(), b"ar");
}
#[cfg(feature="yield-stream")]
#[test]
fn test_yield_stream_empty() {
let task = async {
let bstream = stream::empty::<usize>();
let ystream = super::YieldStream::new(bstream);
ystream.collect::<Vec<usize>>() .await
};
let collected = futr_exec::block_on(task);
assert_eq!(collected.len(), 0);
}
#[cfg(feature="yield-stream")]
#[test]
fn test_yield_stream_multiple() {
let task = async {
let bstream = stream::iter(vec![1, 2, 3]);
let ystream = super::YieldStream::new(bstream);
ystream.collect::<Vec<usize>>() .await
};
let collected = futr_exec::block_on(task);
assert_eq!(vec![1, 2, 3], collected);
}
#[cfg(feature="yield-stream")]
#[test]
fn test_yield_stream_stepping() {
use std::pin::Pin;
use futures_core::stream::Stream;
use futures_core::task::{Context, Poll::*};
use futures_util::task::noop_waker;
let bstream = stream::iter(vec![1, 2, 3]);
let mut ystream = super::YieldStream::new(bstream);
let waker = noop_waker();
let mut ctx = Context::from_waker(&waker);
for i in 1..=3 {
assert_eq!(Pin::new(&mut ystream).poll_next(&mut ctx), Ready(Some(i)));
assert_eq!(Pin::new(&mut ystream).poll_next(&mut ctx), Pending);
}
assert_eq!(Pin::new(&mut ystream).poll_next(&mut ctx), Ready(None));
assert_eq!(Pin::new(&mut ystream).poll_next(&mut ctx), Ready(None));
}