#[cfg(all(test, feature = "tokio"))]
mod tests {
use crate::error::mocks::MockError;
use crate::integration::mocks::MockIntegration;
use crate::{
Daemon, MockConfigFormat, MockService, MockWatchdog, ReloadPolicy, RestartPolicy,
ServiceConfig,
};
use mockall::Sequence;
use mockall::predicate::eq;
use std::future::{pending, ready};
use std::path::PathBuf;
use std::time::Duration;
use tokio::time::{interval, timeout};
use tokio::{join, select};
fn mock_single_use_reload_trigger() -> (MockIntegration, Sequence) {
let mut sequence = Sequence::new();
let mut mock = MockIntegration::new();
mock.expect_wait_for_reload()
.times(1)
.in_sequence(&mut sequence)
.returning(|| Box::pin(ready(())));
mock.expect_wait_for_reload()
.times(1)
.in_sequence(&mut sequence)
.returning(|| Box::pin(pending()));
(mock, sequence)
}
fn mock_single_use_config_loading(config: &'static str) -> (MockConfigFormat, Sequence) {
let mut sequence = Sequence::new();
let mut mock = MockConfigFormat::new();
mock.expect_try_load()
.times(1)
.in_sequence(&mut sequence)
.returning(|_| Ok(config.to_string()));
mock.expect_try_load()
.times(1)
.in_sequence(&mut sequence)
.returning(|_| Err(MockError.into()));
(mock, sequence)
}
#[tokio::test]
async fn shutdown_trigger() {
let mut mock = MockIntegration::new();
mock.expect_wait_for_shutdown()
.times(1)
.returning(|| Box::pin(ready(())));
mock.expect_notify_stopping().times(1).returning(|| ());
let mocked_daemon = Daemon::new(mock);
assert!(!mocked_daemon.shutdown.is_cancelled());
mocked_daemon.shutdown_handler().await;
assert!(mocked_daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn shutdown_external_trigger() {
let mut mock = MockIntegration::new();
mock.expect_wait_for_shutdown()
.times(1)
.returning(|| Box::pin(pending()));
let mocked_daemon = Daemon::new(mock);
join! {
mocked_daemon.shutdown_handler(),
async {mocked_daemon.shutdown.cancel()}
};
}
#[tokio::test]
async fn watchdog_shutdown() {
let mut mock = MockWatchdog::new();
mock.expect_notify_interval().times(1).returning(|| None);
mock.expect_notify().times(0).returning(|| ());
let mocked_daemon = Daemon::<MockWatchdog>::new(mock);
join! {
mocked_daemon.watchdog_handler(),
async {
mocked_daemon.shutdown.cancel();
},
};
}
#[tokio::test]
async fn disabled_watchdog_is_pending() {
let mut mock = MockWatchdog::new();
mock.expect_notify_interval().times(1).returning(|| None);
mock.expect_notify().times(0).returning(|| ());
let mocked_daemon = Daemon::<MockWatchdog>::new(mock);
assert!(
timeout(Duration::from_millis(100), async {
mocked_daemon.watchdog_handler().await
})
.await
.is_err()
);
}
#[tokio::test]
async fn enabled_watchdog_is_triggering() {
let mut mock = MockWatchdog::new();
mock.expect_notify_interval()
.times(1)
.returning(|| Some(Duration::from_millis(50)));
mock.expect_notify().times(4..).returning(|| ());
let mocked_daemon = Daemon::<MockWatchdog>::new(mock);
assert!(
timeout(Duration::from_millis(100), async {
mocked_daemon.watchdog_handler().await
})
.await
.is_err()
);
}
#[tokio::test]
async fn reload_handler_reload_policy_keep_on_error() {
let (mut reload_trigger_mock, _) = mock_single_use_reload_trigger();
reload_trigger_mock
.expect_notify_reloading()
.times(1)
.returning(|| ());
let config_data = "non_default_config";
let (config_loading_mock, _) = mock_single_use_config_loading(config_data);
let config = ServiceConfig::try_load(&PathBuf::new(), config_loading_mock).unwrap();
let daemon = Daemon::new(reload_trigger_mock);
let mut config_update_receiver = config.receiver().clone();
select! {
_ = config_update_receiver.changed() => {
assert_eq!(config.get(), config_data);
},
_ = daemon.reload_handler(&config, ReloadPolicy::KeepOnError) => {
panic!("reload_handler must only return on shutdown token cancellation.");
}
}
}
#[tokio::test]
async fn reload_handler_reload_policy_default_on_error() {
let (mut reload_trigger_mock, _) = mock_single_use_reload_trigger();
reload_trigger_mock
.expect_notify_reloading()
.times(1)
.returning(|| ());
let (config_loading_mock, _) = mock_single_use_config_loading("non_default_config");
let config = ServiceConfig::try_load(&PathBuf::new(), config_loading_mock).unwrap();
let daemon = Daemon::new(reload_trigger_mock);
let mut config_update_receiver = config.receiver().clone();
select! {
_ = config_update_receiver.changed() => {
assert_eq!(config.get(), String::default());
},
_ = daemon.reload_handler(&config, ReloadPolicy::DefaultOnError) => {
panic!("reload_handler must only return on shutdown token cancellation.");
}
}
}
#[tokio::test]
async fn reload_handler_reload_policy_shutdown_on_error() {
let mut reload_trigger_mock = MockIntegration::new();
reload_trigger_mock
.expect_wait_for_reload()
.times(1)
.returning(|| Box::pin(ready(())));
reload_trigger_mock
.expect_notify_stopping()
.times(1)
.returning(|| ());
let (config_loading_mock, _) = mock_single_use_config_loading("non_default_config");
let config = ServiceConfig::try_load(&PathBuf::new(), config_loading_mock).unwrap();
let daemon = Daemon::new(reload_trigger_mock);
let mut config_update_receiver = config.receiver().clone();
select! {
_ = config_update_receiver.changed() => {
panic!("new configurations must not be received on shutdown.");
},
_ = daemon.reload_handler(&config, ReloadPolicy::ShutdownOnError) => {
assert!(daemon.shutdown.is_cancelled());
}
}
}
#[tokio::test]
async fn reload_handler_shutdown() {
let mut reload_trigger_mock = MockIntegration::new();
reload_trigger_mock
.expect_wait_for_reload()
.times(1)
.returning(|| Box::pin(pending()));
let mut config_loading_mock = MockConfigFormat::new();
config_loading_mock
.expect_try_load()
.returning(|_| Ok(String::default()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_loading_mock).unwrap();
let daemon = Daemon::new(reload_trigger_mock);
join! {
daemon.reload_handler(&config, ReloadPolicy::KeepOnError),
async {
daemon.shutdown.cancel();
},
};
}
#[tokio::test]
async fn reload_handler_reload_successfully() {
let mut reload_sequence = Sequence::new();
let mut reload_trigger_mock = MockIntegration::new();
reload_trigger_mock
.expect_wait_for_reload()
.times(1)
.in_sequence(&mut reload_sequence)
.returning(|| Box::pin(ready(())));
reload_trigger_mock
.expect_notify_reloading()
.times(1)
.in_sequence(&mut reload_sequence)
.returning(|| ());
reload_trigger_mock
.expect_wait_for_reload()
.times(1)
.in_sequence(&mut reload_sequence)
.returning(|| Box::pin(pending()));
let config1 = "Config1";
let config2 = "Config2";
let mut config_loading_sequence = Sequence::new();
let mut config_loading_mock = MockConfigFormat::new();
config_loading_mock
.expect_try_load()
.times(1)
.in_sequence(&mut config_loading_sequence)
.returning(|_| Ok(config1.to_string()));
config_loading_mock
.expect_try_load()
.times(1)
.in_sequence(&mut config_loading_sequence)
.returning(|_| Ok(config2.to_string()));
let daemon = Daemon::new(reload_trigger_mock);
let config = ServiceConfig::try_load(&PathBuf::new(), config_loading_mock).unwrap();
assert_eq!(config.get(), config1);
let mut config_update_receiver = config.receiver().clone();
select! {
_ = config_update_receiver.changed() => {
assert_eq!(config.get(), config2);
assert!(!daemon.shutdown.is_cancelled());
},
_ = daemon.reload_handler(&config, ReloadPolicy::DefaultOnError) => {
panic!("reload_handler must only return on shutdown token cancellation.");
}
}
}
#[tokio::test]
async fn service_handler_restart_policy_always_service_returns_ok() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut sequence = Sequence::new();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.in_sequence(&mut sequence)
.returning(|_| Box::pin(ready(Ok(()))));
service_mock
.expect_run()
.times(1)
.in_sequence(&mut sequence)
.returning(|_| Box::pin(pending()));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(2)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::Always)
.await
})
.await
.is_err()
);
assert!(!daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_always_service_returns_error() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut sequence = Sequence::new();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.in_sequence(&mut sequence)
.returning(|_| Box::pin(ready(Err(MockError))));
service_mock
.expect_run()
.times(1)
.in_sequence(&mut sequence)
.returning(|_| Box::pin(pending()));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(2)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::Always)
.await
})
.await
.is_err()
);
assert!(!daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_never_service_returns_okay() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(ready(Ok(()))));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(1)
.returning(|| ());
integration_mock
.expect_notify_stopping()
.times(1)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::Never)
.await
})
.await
.is_ok()
);
assert!(daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_never_service_returns_error() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(ready(Err(MockError))));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(1)
.returning(|| ());
integration_mock
.expect_notify_stopping()
.times(1)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::Never)
.await
})
.await
.is_ok()
);
assert!(daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_on_success_service_returns_okay() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(2)
.returning(|_| Box::pin(ready(Ok(()))));
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(pending()));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(3)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::OnSuccess)
.await
})
.await
.is_err()
);
assert!(!daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_on_success_service_returns_error() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(ready(Err(MockError))));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(1)
.returning(|| ());
integration_mock
.expect_notify_stopping()
.times(1)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::OnSuccess)
.await
})
.await
.is_ok()
);
assert!(daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_on_error_service_returns_okay() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(ready(Ok(()))));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(1)
.returning(|| ());
integration_mock
.expect_notify_stopping()
.times(1)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::OnError)
.await
})
.await
.is_ok()
);
assert!(daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_restart_policy_on_errpr_service_returns_error() {
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.times(1)
.returning(|_| Ok(String::new()));
let config = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(2)
.returning(|_| Box::pin(ready(Err(MockError))));
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(pending()));
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(3)
.returning(|| ());
let daemon = Daemon::new(integration_mock);
assert!(
timeout(Duration::from_millis(100), async {
daemon
.service_handler(&service_mock, &config, RestartPolicy::OnError)
.await
})
.await
.is_err()
);
assert!(!daemon.shutdown.is_cancelled());
}
#[tokio::test]
async fn service_handler_shutdown() {
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(1)
.returning(|| ());
let mut config_loading_mock = MockConfigFormat::new();
config_loading_mock
.expect_try_load()
.returning(|_| Ok(String::default()));
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.returning(|_| Box::pin(pending()));
let daemon = Daemon::new(integration_mock);
let config_mock = ServiceConfig::try_load(&PathBuf::new(), config_loading_mock).unwrap();
join! {
daemon.service_handler(&service_mock, &config_mock, RestartPolicy::Always),
async {
daemon.shutdown.cancel();
},
};
}
#[tokio::test]
async fn service_handler_restart_on_config_update() {
let mut integration_mock = MockIntegration::new();
integration_mock
.expect_notify_ready()
.times(3)
.returning(|| ());
let config1 = "Config 1";
let config2 = "Config 2";
let config3 = "Config 3";
let mut config_mock = MockConfigFormat::new();
config_mock
.expect_try_load()
.returning(|_| Ok(config1.to_string()));
let mut sequence = Sequence::new();
let mut service_mock = MockService::new();
service_mock
.expect_run()
.times(1)
.with(eq(config1.to_string()))
.in_sequence(&mut sequence)
.returning(|_| Box::pin(pending()));
service_mock
.expect_run()
.times(1)
.with(eq(config2.to_string()))
.in_sequence(&mut sequence)
.returning(|_| Box::pin(pending()));
service_mock
.expect_run()
.times(1)
.with(eq(config3.to_string()))
.in_sequence(&mut sequence)
.returning(|_| Box::pin(pending()));
let daemon = Daemon::new(integration_mock);
let config_mock = ServiceConfig::try_load(&PathBuf::new(), config_mock).unwrap();
join! {
daemon.service_handler(&service_mock, &config_mock, RestartPolicy::Always),
async {
let mut delay = interval(Duration::from_millis(50));
delay.tick().await;
config_mock.sender().send(config2.to_string()).unwrap();
delay.tick().await;
config_mock.sender().send(config3.to_string()).unwrap();
delay.tick().await;
daemon.shutdown.cancel();
},
};
}
}