use std::cmp::Ordering;
use std::future::{pending, Pending};
use std::mem::size_of;
use std::pin::Pin;
use std::sync::atomic::{self, AtomicBool};
use std::sync::Arc;
use std::thread::sleep;
use std::time::Duration;
use heph::actor::{self, Actor, NewActor};
use heph::supervisor::{NoSupervisor, Supervisor, SupervisorStrategy};
use mio::Token;
use crate::process::{ActorProcess, FutureProcess, Process, ProcessData, ProcessId, ProcessResult};
use crate::spawn::options::Priority;
use crate::test::{self, init_local_actor_with_inbox, AssertUnmoved, TEST_PID};
use crate::{RuntimeRef, ThreadLocal, ThreadSafe};
#[test]
fn pid() {
assert_eq!(ProcessId(0), ProcessId(0));
assert_eq!(ProcessId(100), ProcessId(100));
assert!(ProcessId(0) < ProcessId(100));
assert_eq!(ProcessId(0).to_string(), "0");
assert_eq!(ProcessId(100).to_string(), "100");
assert_eq!(ProcessId(8000).to_string(), "8000");
}
#[test]
fn pid_and_evented_id() {
let pid = ProcessId(0);
let id: Token = pid.into();
assert_eq!(id, Token(0));
let id = Token(0);
let pid: ProcessId = id.into();
assert_eq!(pid, ProcessId(0));
}
fn assert_size<T>(expected: usize) {
assert_eq!(size_of::<T>(), expected);
}
#[test]
fn size_assertions() {
assert_size::<ProcessId>(8);
assert_size::<Priority>(1);
assert_size::<ProcessData<Box<dyn Process>>>(32);
}
#[derive(Debug)]
struct NopTestProcess;
impl Process for NopTestProcess {
fn name(&self) -> &'static str {
"NopTestProcess"
}
fn run(self: Pin<&mut Self>, _: &mut RuntimeRef, _: ProcessId) -> ProcessResult {
unimplemented!();
}
}
#[test]
#[allow(clippy::eq_op)] fn process_data_equality() {
let process1 = ProcessData::new(Priority::LOW, Box::pin(NopTestProcess));
let process2 = ProcessData::new(Priority::NORMAL, Box::pin(NopTestProcess));
let process3 = ProcessData::new(Priority::HIGH, Box::pin(NopTestProcess));
assert_eq!(process1, process1);
assert_ne!(process1, process2);
assert_ne!(process1, process3);
assert_ne!(process2, process1);
assert_eq!(process2, process2);
assert_ne!(process2, process3);
assert_ne!(process3, process1);
assert_ne!(process3, process2);
assert_eq!(process3, process3);
}
#[test]
fn process_data_ordering() {
let mut process1 = ProcessData::new(Priority::HIGH, Box::pin(NopTestProcess));
let mut process2 = ProcessData::new(Priority::NORMAL, Box::pin(NopTestProcess));
let mut process3 = ProcessData::new(Priority::LOW, Box::pin(NopTestProcess));
assert_eq!(process1.cmp(&process1), Ordering::Equal);
assert_eq!(process1.cmp(&process2), Ordering::Greater);
assert_eq!(process1.cmp(&process3), Ordering::Greater);
assert_eq!(process2.cmp(&process1), Ordering::Less);
assert_eq!(process2.cmp(&process2), Ordering::Equal);
assert_eq!(process2.cmp(&process3), Ordering::Greater);
assert_eq!(process3.cmp(&process1), Ordering::Less);
assert_eq!(process3.cmp(&process2), Ordering::Less);
assert_eq!(process3.cmp(&process3), Ordering::Equal);
let duration = Duration::from_millis(0);
process1.fair_runtime = duration;
process2.fair_runtime = duration;
process3.fair_runtime = duration;
assert_eq!(process1.cmp(&process1), Ordering::Equal);
assert_eq!(process1.cmp(&process2), Ordering::Greater);
assert_eq!(process1.cmp(&process3), Ordering::Greater);
assert_eq!(process2.cmp(&process1), Ordering::Less);
assert_eq!(process2.cmp(&process2), Ordering::Equal);
assert_eq!(process2.cmp(&process3), Ordering::Greater);
assert_eq!(process3.cmp(&process1), Ordering::Less);
assert_eq!(process3.cmp(&process2), Ordering::Less);
assert_eq!(process3.cmp(&process3), Ordering::Equal);
}
#[derive(Debug)]
struct SleepyProcess(Duration);
impl Process for SleepyProcess {
fn name(&self) -> &'static str {
"SleepyProcess"
}
fn run(self: Pin<&mut Self>, _: &mut RuntimeRef, _: ProcessId) -> ProcessResult {
sleep(self.0);
ProcessResult::Pending
}
}
#[test]
fn process_data_runtime_increase() {
const SLEEP_TIME: Duration = Duration::from_millis(10);
let mut process = Box::pin(ProcessData::new(
Priority::HIGH,
Box::pin(SleepyProcess(SLEEP_TIME)),
));
process.fair_runtime = Duration::from_millis(10);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref);
assert_eq!(res, ProcessResult::Pending);
assert!(process.fair_runtime >= SLEEP_TIME);
}
async fn ok_actor(mut ctx: actor::Context<(), ThreadLocal>) {
assert_eq!(ctx.receive_next().await, Ok(()));
}
#[test]
fn actor_process() {
let new_actor = ok_actor as fn(_) -> _;
let (actor, inbox, actor_ref) = init_local_actor_with_inbox(new_actor, ()).unwrap();
let process = ActorProcess::new(NoSupervisor, new_actor, actor, inbox);
let mut process = Box::pin(process);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref, TEST_PID);
assert_eq!(res, ProcessResult::Pending);
actor_ref.try_send(()).unwrap();
let res = process.as_mut().run(&mut runtime_ref, TEST_PID);
assert_eq!(res, ProcessResult::Complete);
}
async fn error_actor(mut ctx: actor::Context<(), ThreadLocal>, fail: bool) -> Result<(), ()> {
if fail {
Err(())
} else {
assert_eq!(ctx.receive_next().await, Ok(()));
Ok(())
}
}
#[test]
fn erroneous_actor_process() {
let new_actor = error_actor as fn(_, _) -> _;
let (actor, inbox, _) = init_local_actor_with_inbox(new_actor, true).unwrap();
let process = ActorProcess::new(|_| SupervisorStrategy::Stop, new_actor, actor, inbox);
let mut process = Box::pin(process);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref, TEST_PID);
assert_eq!(res, ProcessResult::Complete);
}
#[test]
fn restarting_erroneous_actor_process() {
struct TestSupervisor(Arc<AtomicBool>);
impl<NA> Supervisor<NA> for TestSupervisor
where
NA: NewActor<Argument = bool>,
{
fn decide(&mut self, _: <NA::Actor as Actor>::Error) -> SupervisorStrategy<NA::Argument> {
self.0.store(true, atomic::Ordering::SeqCst);
SupervisorStrategy::Restart(false)
}
fn decide_on_restart_error(&mut self, _: NA::Error) -> SupervisorStrategy<NA::Argument> {
self.0.store(true, atomic::Ordering::SeqCst);
SupervisorStrategy::Restart(false)
}
fn second_restart_error(&mut self, _: NA::Error) {
unreachable!("test call to second_restart_error in ActorProcess");
}
}
let new_actor = error_actor as fn(_, _) -> _;
let (actor, inbox, actor_ref) = init_local_actor_with_inbox(new_actor, true).unwrap();
let supervisor_called = Arc::new(AtomicBool::new(false));
let supervisor = TestSupervisor(Arc::clone(&supervisor_called));
let process = ActorProcess::new(supervisor, new_actor, actor, inbox);
let mut process: Pin<Box<dyn Process>> = Box::pin(process);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
assert!(supervisor_called.load(atomic::Ordering::SeqCst));
actor_ref.try_send(()).unwrap();
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Complete);
}
struct TestAssertUnmovedNewActor;
impl NewActor for TestAssertUnmovedNewActor {
type Message = ();
type Argument = ();
type Actor = AssertUnmoved<Pending<Result<(), !>>>;
type Error = !;
type RuntimeAccess = ThreadLocal;
fn new(
&mut self,
_: actor::Context<Self::Message, ThreadLocal>,
_: Self::Argument,
) -> Result<Self::Actor, Self::Error> {
Ok(AssertUnmoved::new(pending()))
}
}
#[test]
fn actor_process_assert_actor_unmoved() {
let (actor, inbox, _) = init_local_actor_with_inbox(TestAssertUnmovedNewActor, ()).unwrap();
let process = ActorProcess::new(NoSupervisor, TestAssertUnmovedNewActor, actor, inbox);
let mut process: Pin<Box<dyn Process>> = Box::pin(process);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
}
#[test]
fn future_process_thread_local_assert_future_unmoved() {
let process = FutureProcess::<_, ThreadLocal>::new(AssertUnmoved::new(pending()));
let mut process: Pin<Box<dyn Process>> = Box::pin(process);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
}
#[test]
fn future_process_thread_safe_assert_future_unmoved() {
let process = FutureProcess::<_, ThreadSafe>::new(AssertUnmoved::new(pending()));
let mut process: Pin<Box<dyn Process>> = Box::pin(process);
let mut runtime_ref = test::runtime();
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
let res = process.as_mut().run(&mut runtime_ref, ProcessId(0));
assert_eq!(res, ProcessResult::Pending);
}