#![forbid(unsafe_code)]
use core::fmt::Debug;
use core::ops::Range;
use core::time::Duration;
use safina_executor::{get_thread_executor, Executor};
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
fn assert_in_range<T: PartialOrd + Debug>(range: Range<T>, value: &T) {
assert!(!range.is_empty(), "invalid range {:?}", range);
assert!(
range.contains(value),
"measured concurrency value {:?} out of range {:?}",
value,
range,
);
}
pub fn assert_elapsed(before: Instant, range_ms: Range<u64>) {
assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
let elapsed = before.elapsed();
let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
assert!(
duration_range.contains(&elapsed),
"{:?} elapsed, out of range {:?}",
elapsed,
duration_range
);
}
fn measure_async_concurrency(executor: &Arc<Executor>, num_jobs: usize) -> f32 {
const WAIT_DURATION: Duration = Duration::from_millis(100);
let before = Instant::now();
let receiver = {
let (sender, receiver) = std::sync::mpsc::channel();
for _ in 0..num_jobs {
let sender_clone = sender.clone();
executor.spawn(async move {
std::thread::sleep(WAIT_DURATION);
sender_clone.send(()).unwrap();
});
}
receiver
};
for _ in 0..num_jobs {
receiver.recv_timeout(Duration::from_millis(500)).unwrap();
}
let elapsed = before.elapsed();
elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32()
}
fn expect_async_concurrency(executor: &Arc<Executor>, num_tasks: usize) {
assert_in_range(
1.0_f32..1.90,
&measure_async_concurrency(executor, num_tasks),
);
assert_in_range(
2.0_f32..2.90,
&measure_async_concurrency(executor, num_tasks + 1),
);
}
async fn measure_blocking_concurrency(executor: &Arc<Executor>, num_jobs: usize) -> f32 {
const WAIT_DURATION: Duration = Duration::from_millis(100);
let before = Instant::now();
let mut receivers = Vec::new();
for _ in 0..num_jobs {
receivers.push(executor.schedule_blocking(|| std::thread::sleep(WAIT_DURATION)));
}
let deadline = Instant::now() + WAIT_DURATION * 3;
for mut receiver in receivers {
safina_timer::with_deadline(async move { receiver.async_recv().await }, deadline)
.await
.unwrap()
.unwrap();
}
let elapsed = before.elapsed();
elapsed.as_secs_f32() / WAIT_DURATION.as_secs_f32()
}
async fn expect_blocking_concurrency(executor: &Arc<Executor>, num_tasks: usize) {
assert_in_range(
1.0_f32..1.90,
&measure_blocking_concurrency(executor, num_tasks).await,
);
assert_in_range(
2.0_f32..2.90,
&measure_blocking_concurrency(executor, num_tasks + 1).await,
);
}
trait UnwindAnyToString {
fn any_to_string(&self) -> String;
}
impl UnwindAnyToString for Box<dyn std::any::Any + Send> {
fn any_to_string(&self) -> String {
if let Some(s) = self.downcast_ref::<&'static str>() {
(*s).to_string()
} else if let Some(s) = self.downcast_ref::<String>() {
s.clone()
} else {
format!("{:?}", self)
}
}
}
pub fn expect_elapsed(before: Instant, range_ms: Range<u64>) {
assert!(!range_ms.is_empty(), "invalid range {:?}", range_ms);
let elapsed = before.elapsed();
let duration_range = Duration::from_millis(range_ms.start)..Duration::from_millis(range_ms.end);
assert!(
duration_range.contains(&elapsed),
"{:?} elapsed, out of range {:?}",
elapsed,
duration_range
);
}
fn get_async_thread_name(executor: &Arc<Executor>) -> String {
let (sender, receiver) = std::sync::mpsc::channel();
executor.spawn(async move {
sender
.send(std::thread::current().name().unwrap().to_string())
.unwrap();
});
receiver.recv_timeout(Duration::from_millis(500)).unwrap()
}
async fn get_blocking_thread_name(executor: &Arc<Executor>) -> String {
executor
.schedule_blocking(|| std::thread::current().name().unwrap().to_string())
.async_recv()
.await
.unwrap()
}
#[test]
fn test_default() {
let executor = Executor::default();
expect_async_concurrency(&executor, 4);
assert_eq!(
"async",
get_async_thread_name(&executor)
.strip_suffix(|c| "0123".contains(c))
.unwrap()
);
executor.block_on(async {
let executor = get_thread_executor().unwrap();
expect_blocking_concurrency(&executor, 4).await;
assert_eq!(
"blocking",
get_blocking_thread_name(&executor)
.await
.strip_suffix(|c| "0123".contains(c))
.unwrap()
);
});
}
#[test]
#[allow(clippy::default_trait_access)]
fn test_default_trait() {
let executor: Arc<Executor> = Arc::new(Default::default());
expect_async_concurrency(&executor, 4);
assert_eq!(
"async",
get_async_thread_name(&executor)
.strip_suffix(|c| "0123".contains(c))
.unwrap()
);
let executor_clone = executor.clone();
executor.block_on(async move {
expect_blocking_concurrency(&executor_clone, 4).await;
assert_eq!(
"blocking",
get_blocking_thread_name(&executor_clone)
.await
.strip_suffix(|c| "0123".contains(c))
.unwrap()
);
});
}
#[test]
fn test_new() {
let executor = Executor::new(2, 2).unwrap();
expect_async_concurrency(&executor, 2);
assert_eq!(
"async",
get_async_thread_name(&executor)
.strip_suffix(|c| "01".contains(c))
.unwrap()
);
let executor_clone = executor.clone();
executor.block_on(async move {
expect_blocking_concurrency(&executor_clone, 2).await;
assert_eq!(
"blocking",
get_blocking_thread_name(&executor_clone)
.await
.strip_suffix(|c| "0123".contains(c))
.unwrap()
);
});
}
#[test]
fn test_with_name() {
let executor = Executor::with_name("exec", 2, "io", 2).unwrap();
expect_async_concurrency(&executor, 2);
assert_eq!(
"exec",
get_async_thread_name(&executor)
.strip_suffix(|c| "01".contains(c))
.unwrap()
);
let executor_clone = executor.clone();
executor.block_on(async move {
expect_blocking_concurrency(&executor_clone, 2).await;
assert_eq!(
"io",
get_blocking_thread_name(&executor_clone)
.await
.strip_suffix(|c| "0123".contains(c))
.unwrap()
);
});
}
#[test]
fn test_spawn_unpin() {
let executor = safina_executor::Executor::default();
let (sender, receiver) = std::sync::mpsc::channel();
executor.spawn(async move {
sender.send(()).unwrap();
});
receiver.recv().unwrap();
}
#[test]
fn test_spawn() {
let executor = safina_executor::Executor::default();
let (sender, receiver) = std::sync::mpsc::channel();
executor.spawn_unpin(Box::pin(async move {
sender.send(()).unwrap();
}));
receiver.recv().unwrap();
}
#[test]
fn test_waking() {
safina_timer::start_timer_thread();
let executor = safina_executor::Executor::default();
let (sender, receiver) = std::sync::mpsc::channel();
let before = Instant::now();
executor.spawn(async move {
safina_timer::sleep_for(Duration::from_millis(100)).await;
sender.send(()).unwrap();
});
receiver.recv().unwrap();
expect_elapsed(before, 100..200);
}
#[test]
fn test_wake_after_ready() {
struct FnFuture(Box<dyn (FnMut(&mut Context<'_>) -> Poll<()>) + Send>);
impl Future for FnFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.as_mut().0(cx)
}
}
safina_timer::start_timer_thread();
let executor = safina_executor::Executor::default();
let (sender, receiver) = std::sync::mpsc::channel();
let state = Arc::new(AtomicBool::new(true));
let fut = FnFuture(Box::new(move |cx| {
sender.send(cx.waker().clone()).unwrap();
if state.swap(false, Ordering::AcqRel) {
Poll::Ready(())
} else {
panic!("resumed after completion");
}
}));
executor.spawn(fut);
receiver.recv().unwrap().wake();
receiver.recv().unwrap_err();
}
#[test]
fn test_async_panic() {
let executor = safina_executor::Executor::new(1, 1).unwrap();
executor.spawn(async { panic!("ignore this panic") });
std::thread::sleep(Duration::from_millis(100));
let (sender, receiver) = std::sync::mpsc::channel();
executor.spawn(async move {
sender.send(()).unwrap();
});
receiver.recv().unwrap();
}
#[test]
fn test_executor_block_on() {
safina_timer::start_timer_thread();
let executor = safina_executor::Executor::new(1, 1).unwrap();
assert_eq!(3_u8, executor.block_on(async { 3_u8 }));
let value = 5_u8;
assert_eq!(5_u8, executor.block_on(async move { value }));
let before = Instant::now();
executor.block_on(async {
safina_timer::sleep_for(Duration::from_millis(100)).await;
});
expect_elapsed(before, 100..200);
assert_eq!(
"ignore this panic",
std::panic::catch_unwind(|| executor.block_on(async { panic!("ignore this panic") }))
.unwrap_err()
.any_to_string()
.as_str()
);
executor.block_on(async {
let (sender, receiver) = std::sync::mpsc::channel();
safina_executor::spawn(async move {
sender.send(()).unwrap();
});
receiver.recv().unwrap();
});
}
#[test]
fn test_block_on() {
safina_timer::start_timer_thread();
assert_eq!(3_u8, safina_executor::block_on(async { 3_u8 }));
let value = 5_u8;
assert_eq!(5_u8, safina_executor::block_on(async move { value }));
let before = Instant::now();
safina_executor::block_on(async {
safina_timer::sleep_for(Duration::from_millis(100)).await;
});
expect_elapsed(before, 100..200);
assert_eq!(
"ignore this panic",
std::panic::catch_unwind(|| safina_executor::block_on(async {
panic!("ignore this panic");
}))
.unwrap_err()
.any_to_string()
.as_str()
);
}
#[test]
fn test_block_on_unpin() {
assert_eq!(
3_u8,
safina_executor::block_on_unpin(Box::pin(async { 3_u8 }))
);
}
#[test]
fn test_schedule_blocking_simple() {
let executor = safina_executor::Executor::new(1, 1).unwrap();
executor.block_on(async {
assert_eq!(
3_u8,
safina_executor::schedule_blocking(|| 3_u8)
.async_recv()
.await
.unwrap()
);
});
}
#[test]
fn test_executor_schedule_blocking_simple() {
let executor = safina_executor::Executor::new(2, 2).unwrap();
let executor_clone = executor.clone();
executor.block_on(async move {
assert_eq!(
3_u8,
executor_clone
.schedule_blocking(|| 3_u8)
.async_recv()
.await
.unwrap()
);
});
}
#[test]
fn test_schedule_blocking_move() {
let executor = safina_executor::Executor::new(1, 1).unwrap();
executor.block_on(async {
let value = 5_u8;
assert_eq!(
5_u8,
safina_executor::schedule_blocking(move || value)
.async_recv()
.await
.unwrap()
);
});
}
#[test]
fn test_executor_schedule_blocking_move() {
let executor = safina_executor::Executor::new(2, 2).unwrap();
let executor_clone = executor.clone();
executor.block_on(async move {
let value = 5_u8;
assert_eq!(
5_u8,
executor_clone
.schedule_blocking(move || value)
.async_recv()
.await
.unwrap()
);
});
}
#[test]
fn test_schedule_blocking_sleep() {
let executor = safina_executor::Executor::new(1, 1).unwrap();
executor.block_on(async {
let before = Instant::now();
assert_eq!(
7_u8,
safina_executor::schedule_blocking(|| {
std::thread::sleep(Duration::from_millis(100));
7_u8
})
.async_recv()
.await
.unwrap()
);
expect_elapsed(before, 100..200);
});
}
#[test]
fn test_executor_schedule_blocking_sleep() {
let executor = safina_executor::Executor::new(2, 2).unwrap();
let executor_clone = executor.clone();
executor.block_on(async move {
let before = Instant::now();
assert_eq!(
7_u8,
executor_clone
.schedule_blocking(|| {
std::thread::sleep(Duration::from_millis(100));
7_u8
})
.async_recv()
.await
.unwrap()
);
expect_elapsed(before, 100..200);
});
}
#[test]
fn test_schedule_blocking_panic() {
let executor = safina_executor::Executor::new(1, 1).unwrap();
executor.block_on(async {
safina_executor::schedule_blocking(|| panic!("ignore this panic"))
.async_recv()
.await
.unwrap_err();
});
}
#[test]
fn test_executor_schedule_blocking_panic() {
let executor = safina_executor::Executor::new(2, 2).unwrap();
let executor_clone = executor.clone();
executor.block_on(async move {
executor_clone
.schedule_blocking(|| panic!("ignore this panic"))
.async_recv()
.await
.unwrap_err();
});
}
#[test]
fn test_schedule_blocking_recv() {
safina_timer::start_timer_thread();
let executor = safina_executor::Executor::new(1, 1).unwrap();
executor.block_on(async {
let before = Instant::now();
let (sender, receiver) = std::sync::mpsc::channel();
let mut result_receiver =
safina_executor::schedule_blocking(move || receiver.recv().unwrap());
safina_timer::sleep_for(Duration::from_millis(100)).await;
sender.send(9_u8).unwrap();
assert_eq!(9_u8, result_receiver.async_recv().await.unwrap());
expect_elapsed(before, 100..200);
});
}
#[test]
fn test_executor_schedule_blocking_recv() {
safina_timer::start_timer_thread();
let executor = safina_executor::Executor::new(2, 2).unwrap();
let executor_clone = executor.clone();
executor.block_on(async move {
let before = Instant::now();
let (sender, receiver) = std::sync::mpsc::channel();
let mut result_receiver =
executor_clone.schedule_blocking(move || receiver.recv().unwrap());
safina_timer::sleep_for(Duration::from_millis(100)).await;
sender.send(9_u8).unwrap();
assert_eq!(9_u8, result_receiver.async_recv().await.unwrap());
expect_elapsed(before, 100..200);
});
}
#[test]
fn no_executor_running() {
assert_eq!(
"called from outside a task; check for duplicate safina-executor crate: cargo tree -d",
std::panic::catch_unwind(
|| safina_executor::block_on(safina_executor::schedule_blocking(|| 3_u8))
)
.unwrap_err()
.any_to_string()
.as_str()
);
assert_eq!(
"called from outside a task; check for duplicate safina-executor crate: cargo tree -d",
std::panic::catch_unwind(|| safina_executor::block_on(async {
safina_executor::spawn(async {});
}))
.unwrap_err()
.any_to_string()
.as_str()
);
}