use std::time::Duration;
use lunatic::ap::handlers::{Message, Request};
use lunatic::ap::{AbstractProcess, Config, MessageHandler, ProcessRef, RequestHandler, State};
use lunatic::serializer::{Json, MessagePack};
use lunatic::supervisor::{Supervisor, SupervisorConfig, SupervisorStrategy};
use lunatic::{sleep, spawn, test, ProcessConfig};
const LOGGER_NAME: &'static str = "logger/assert_order";
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
enum LogEvent {
Init(char),
Panic(char),
Shutdown(char),
}
struct Logger {
logs: Vec<LogEvent>,
}
impl AbstractProcess for Logger {
type Arg = ();
type State = Logger;
type Serializer = Json;
type Handlers = (Request<LogEvent>, Request<TakeLogs>);
type StartupError = ();
fn init(_: Config<Logger>, _arg: Self::Arg) -> Result<Self::State, ()> {
Ok(Logger { logs: vec![] })
}
}
impl RequestHandler<LogEvent> for Logger {
type Response = ();
fn handle(mut state: State<Self>, request: LogEvent) -> Self::Response {
state.logs.push(request);
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct TakeLogs;
impl RequestHandler<TakeLogs> for Logger {
type Response = Vec<LogEvent>;
fn handle(mut state: State<Self>, _request: TakeLogs) -> Self::Response {
std::mem::replace(&mut state.logs, vec![])
}
}
struct A {
count: u32,
name: char,
}
impl AbstractProcess for A {
type Arg = (u32, char);
type State = A;
type Serializer = MessagePack;
type Handlers = (
Message<Inc>,
Request<Count>,
Message<Panic>,
Request<GetEnvVar>,
);
type StartupError = ();
fn init(_: Config<Self>, (count, name): Self::Arg) -> Result<A, ()> {
if let Some(logger) = ProcessRef::<Logger>::lookup(&LOGGER_NAME) {
let log = LogEvent::Init(name);
logger.request(log);
}
Ok(A { count, name })
}
fn terminate(state: Self::State) {
if let Some(logger) = ProcessRef::<Logger>::lookup(&LOGGER_NAME) {
let log = LogEvent::Shutdown(state.name);
logger.request(log);
}
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct Inc;
impl MessageHandler<Inc> for A {
fn handle(mut state: State<Self>, _: Inc) {
state.count += 1;
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct Count;
impl RequestHandler<Count> for A {
type Response = u32;
fn handle(state: State<Self>, _: Count) -> u32 {
state.count
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct Panic;
impl MessageHandler<Panic> for A {
fn handle(state: State<Self>, _: Panic) {
if let Some(logger) = ProcessRef::<Logger>::lookup(&LOGGER_NAME) {
let log = LogEvent::Panic(state.name);
logger.request(log);
}
panic!();
}
}
#[derive(serde::Serialize, serde::Deserialize)]
struct GetEnvVar(String);
impl RequestHandler<GetEnvVar> for A {
type Response = Option<String>;
fn handle(_: State<Self>, env_var: GetEnvVar) -> Option<String> {
let mut vars = std::env::vars();
vars.find(|(key, _)| key == &env_var.0)
.map(|(_, value)| value)
}
}
#[test]
fn one_failing_process() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A,);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
let starting_state = (4, ' ');
config.set_args((starting_state,));
}
}
let sup = Sup::link().start(()).unwrap();
let child = sup.children().0;
for i in 4..30 {
assert_eq!(i, child.request(Count));
child.send(Inc);
}
child.send(Panic);
sleep(Duration::from_millis(10));
let child = sup.children().0;
for i in 4..30 {
assert_eq!(i, child.request(Count));
child.send(Inc);
}
}
#[test]
fn two_failing_process_one_for_one() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A, A);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
let starting_state_a = (33, 'a');
let starting_state_b = (44, 'b');
config.set_args((starting_state_a, starting_state_b));
}
}
let logger = Logger::link().start_as(&LOGGER_NAME, ()).unwrap();
let sup = Sup::link().start(()).unwrap();
let (a, b) = sup.children();
for i in 33..36 {
assert_eq!(i, a.request(Count));
a.send(Inc);
}
for i in 44..88 {
assert_eq!(i, b.request(Count));
b.send(Inc);
}
b.send(Panic);
sleep(Duration::from_millis(10));
let log = logger.request(TakeLogs);
assert_eq!(
log,
vec![
LogEvent::Init('a'),
LogEvent::Init('b'),
LogEvent::Panic('b'),
LogEvent::Init('b'),
]
);
let (a, b) = sup.children();
for i in 36..99 {
assert_eq!(i, a.request(Count));
a.send(Inc);
}
for i in 44..66 {
assert_eq!(i, b.request(Count));
b.send(Inc);
}
a.send(Panic);
sleep(Duration::from_millis(10));
let log = logger.request(TakeLogs);
assert_eq!(
log,
vec![
LogEvent::Panic('a'),
LogEvent::Init('a'),
]
);
let (a, b) = sup.children();
for i in 33..50 {
assert_eq!(i, a.request(Count));
a.send(Inc);
}
for i in 66..100 {
assert_eq!(i, b.request(Count));
b.send(Inc);
}
}
#[test]
fn two_failing_process_one_for_all() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A, A);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForAll);
let starting_state_a = (33, 'a');
let starting_state_b = (44, 'b');
config.set_args((starting_state_a, starting_state_b));
}
}
let logger = Logger::link().start_as(&LOGGER_NAME, ()).unwrap();
let sup = Sup::link().start(()).unwrap();
let (a, b) = sup.children();
for i in 33..36 {
assert_eq!(i, a.request(Count));
a.send(Inc);
}
for i in 44..88 {
assert_eq!(i, b.request(Count));
b.send(Inc);
}
b.send(Panic);
sleep(Duration::from_millis(10));
let log = logger.request(TakeLogs);
assert_eq!(
log,
vec![
LogEvent::Init('a'),
LogEvent::Init('b'),
LogEvent::Panic('b'),
LogEvent::Shutdown('a'),
LogEvent::Init('a'),
LogEvent::Init('b'),
]
);
let (a, b) = sup.children();
for i in 33..36 {
assert_eq!(i, a.request(Count));
a.send(Inc);
}
for i in 44..66 {
assert_eq!(i, b.request(Count));
b.send(Inc);
}
a.send(Panic);
sleep(Duration::from_millis(10));
let log = logger.request(TakeLogs);
assert_eq!(
log,
vec![
LogEvent::Panic('a'),
LogEvent::Shutdown('b'),
LogEvent::Init('a'),
LogEvent::Init('b'),
]
);
let (a, b) = sup.children();
for i in 33..50 {
assert_eq!(i, a.request(Count));
a.send(Inc);
}
for i in 44..66 {
assert_eq!(i, b.request(Count));
b.send(Inc);
}
}
#[test]
fn four_failing_process_rest_for_all() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A, A, A, A);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::RestForOne);
let starting_state_a = (33, 'a');
let starting_state_b = (44, 'b');
let starting_state_c = (55, 'c');
let starting_state_d = (66, 'd');
config.set_args((
starting_state_a,
starting_state_b,
starting_state_c,
starting_state_d,
));
}
}
let logger = Logger::link().start_as(&LOGGER_NAME, ()).unwrap();
let sup = Sup::link().start(()).unwrap();
let (_, b, _, _) = sup.children();
b.send(Panic);
sleep(Duration::from_millis(10));
let logs = logger.request(TakeLogs);
assert_eq!(
logs,
vec![
LogEvent::Init('a'),
LogEvent::Init('b'),
LogEvent::Init('c'),
LogEvent::Init('d'),
LogEvent::Panic('b'),
LogEvent::Shutdown('d'),
LogEvent::Shutdown('c'),
LogEvent::Init('b'),
LogEvent::Init('c'),
LogEvent::Init('d'),
]
);
println!("wroks");
let (a, _, _, _) = sup.children();
a.send(Panic);
sleep(Duration::from_millis(10));
let logs = logger.request(TakeLogs);
assert_eq!(
logs,
vec![
LogEvent::Panic('a'),
LogEvent::Shutdown('d'),
LogEvent::Shutdown('c'),
LogEvent::Shutdown('b'),
LogEvent::Init('a'),
LogEvent::Init('b'),
LogEvent::Init('c'),
LogEvent::Init('d'),
]
);
println!("wroks");
let (_, _, _, d) = sup.children();
println!("wroks");
d.send(Panic);
sleep(Duration::from_millis(10));
let logs = logger.request(TakeLogs);
assert_eq!(
logs,
vec![
LogEvent::Panic('d'),
LogEvent::Init('d'),
]
);
}
#[test]
fn ten_children_sup() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A, A, A, A, A, A, A, A, A, A);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
config.set_args((
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
(0, ' '),
));
}
}
Sup::link().start(()).unwrap();
}
#[test]
#[should_panic]
fn children_args_not_called() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A,);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
}
}
Sup::link().start(()).unwrap();
}
#[test]
fn shutdown() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A, A, A, A);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
config.set_args(((0, 'a'), (0, 'b'), (0, 'c'), (0, 'd')));
}
}
let logger = Logger::link().start_as(&LOGGER_NAME, ()).unwrap();
let sup = Sup::link().start(()).unwrap();
sup.shutdown();
let log = logger.request(TakeLogs);
assert_eq!(
log,
vec![
LogEvent::Init('a'),
LogEvent::Init('b'),
LogEvent::Init('c'),
LogEvent::Init('d'),
LogEvent::Shutdown('d'),
LogEvent::Shutdown('c'),
LogEvent::Shutdown('b'),
LogEvent::Shutdown('a'),
],
);
}
#[test]
fn lookup_children() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A, A, A);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
config.set_args(((0, ' '), (1, ' '), (2, ' ')));
config.set_names((
Some("first".to_owned()),
Some("second".to_owned()),
Some("third".to_owned()),
))
}
}
Sup::link().start(()).unwrap();
let first = ProcessRef::<A>::lookup(&"first").unwrap();
assert_eq!(first.request(Count), 0);
let second = ProcessRef::<A>::lookup(&"second").unwrap();
assert_eq!(second.request(Count), 1);
let third = ProcessRef::<A>::lookup(&"third").unwrap();
assert_eq!(third.request(Count), 2);
third.send(Panic);
sleep(Duration::from_millis(10));
let third = ProcessRef::<A>::lookup(&"third").unwrap();
third.send(Inc);
third.send(Inc);
assert_eq!(third.request(Count), 4);
let third = ProcessRef::<A>::lookup(&"third").unwrap();
assert_eq!(third.request(Count), 4);
}
#[test]
fn wait_on_shutdown() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A,);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
config.set_args(((0, ' '),));
}
}
let sup = Sup::link().start(()).unwrap();
let sup_cloned = sup.clone();
spawn!(|sup, _mailbox: Mailbox<()>| {
sleep(Duration::from_millis(10));
sup.shutdown();
});
sup_cloned.wait_on_shutdown()
}
#[test]
fn env_var_config() {
struct Sup;
impl Supervisor for Sup {
type Arg = ();
type Children = (A,);
fn init(config: &mut SupervisorConfig<Self>, _: ()) {
config.set_strategy(SupervisorStrategy::OneForOne);
config.set_args(((0, ' '),));
config.set_names((Some("named".to_owned()),));
let mut process_config = ProcessConfig::new().unwrap();
process_config.add_environment_variable("Hello", "world");
config.set_configs((Some(process_config),));
}
}
Sup::link().start(()).unwrap();
let named = ProcessRef::<A>::lookup(&"named").unwrap();
assert_eq!(
named.request(GetEnvVar("Hello".to_string())),
Some("world".to_string())
);
assert_eq!(named.request(GetEnvVar("no".to_string())), None);
named.send(Panic);
sleep(Duration::from_millis(10));
let named = ProcessRef::<A>::lookup(&"named").unwrap();
assert_eq!(
named.request(GetEnvVar("Hello".to_string())),
Some("world".to_string())
);
assert_eq!(named.request(GetEnvVar("no".to_string())), None);
}