#![deny(missing_docs)]
#[cfg(not(any(feature = "agnostic", feature = "tokio")))]
compile_error!("Neither feature 'agnostic' nor 'tokio' are enabled. Check this crates features.");
#[cfg(all(feature = "systemd", target_os = "linux"))]
use integration::systemd::SystemdIntegration as DefaultIntegration;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::error::Error;
use std::fmt::Debug;
use std::fs::{File, canonicalize};
use std::future::{Future, pending};
use std::io::{ErrorKind, Read, Write};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use std::time::Duration;
use tracing::{Level, debug, error, info, instrument, warn};
use crate::error::{LoadError, SaveError};
mod config_format;
mod error;
mod integration;
mod lib_agnostic_tests;
mod lib_tests;
mod lib_tokio_tests;
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
use async_watch::{Receiver, Sender, channel};
#[cfg(feature = "tokio")]
use tokio::sync::watch::{Receiver, Sender, channel};
#[cfg_attr(test, mockall::automock)]
pub trait Logger {
fn setup_logger(&self, level: Level);
}
#[cfg_attr(test, mockall::automock)]
pub trait Watchdog {
fn notify_interval(&self) -> Option<Duration>;
fn notify(&self);
}
#[cfg_attr(test, mockall::automock)]
pub trait Shutdown {
fn wait_for_shutdown(&self) -> impl Future<Output = ()>;
}
#[cfg_attr(test, mockall::automock)]
pub trait Reload {
fn wait_for_reload(&self) -> impl Future<Output = ()>;
}
#[cfg_attr(test, mockall::automock)]
pub trait Notify {
fn notify_ready(&self);
fn notify_reloading(&self);
fn notify_stopping(&self);
}
#[cfg_attr(test, mockall::automock(type Data=String;))]
pub trait ConfigFormat {
type Data: Serialize + DeserializeOwned;
#[instrument(skip_all)]
fn try_save(&self, path: &Path, data: &Self::Data) -> Result<(), SaveError> {
debug!("Canonicalize path '{}'.", path.display());
let path = match canonicalize(path) {
Ok(path) => {
debug!("Canonicalized path is '{}'", path.display());
path
}
Err(err) => {
if err.kind() == ErrorKind::NotFound {
debug!(
"Failed to canonicalize path '{}', it does not exist (yet)",
path.display()
);
path.to_path_buf()
} else {
return Err(err.into());
}
}
};
let data = self.serialize(data)?;
Ok(File::create(&path)?
.write_all(data.as_bytes())
.inspect(|()| debug!("Stored config successfully under '{}'", path.display()))?)
}
#[instrument(skip_all)]
fn try_load(&self, path: &Path) -> Result<Self::Data, LoadError> {
debug!("Canonicalize path '{}'.", path.display());
let path = canonicalize(path)?;
debug!("Canonicalized path is '{}'", path.display());
let mut file = File::open(&path)?;
let mut data = String::default();
file.read_to_string(&mut data)?;
self.deserialize(data.as_str()).inspect(|_| {
debug!("Loaded data successfully from path '{}'", path.display());
})
}
fn serialize(&self, data: &Self::Data) -> Result<String, SaveError>;
fn deserialize(&self, data: &str) -> Result<Self::Data, LoadError>;
}
#[cfg_attr(test, mockall::automock(type Config=String; type Error=crate::error::mocks::MockError;))]
pub trait Service {
type Config: Default + Clone + Serialize + DeserializeOwned + Debug;
type Error: Error;
fn run(&self, config: Self::Config) -> impl Future<Output = Result<(), Self::Error>>;
}
#[derive(Debug, PartialEq, Default)]
pub enum RestartPolicy {
#[default]
Always,
OnSuccess,
OnError,
Never,
}
#[derive(Debug, PartialEq, Default)]
pub enum ReloadPolicy {
#[default]
KeepOnError,
DefaultOnError,
ShutdownOnError,
}
pub struct Daemon<T> {
integration: T,
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
shutdown: cancellation_token::CancellationTokenSource,
#[cfg(feature = "tokio")]
shutdown: tokio_util::sync::CancellationToken,
}
impl Default for Daemon<DefaultIntegration> {
fn default() -> Self {
Daemon::<DefaultIntegration>::new(DefaultIntegration)
}
}
impl<T> Daemon<T> {
pub fn new(integration: T) -> Self {
Self {
integration,
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
shutdown: cancellation_token::CancellationTokenSource::new(),
#[cfg(feature = "tokio")]
shutdown: tokio_util::sync::CancellationToken::new(),
}
}
}
impl<T> Daemon<T>
where
T: Logger,
{
pub fn setup_logger(&self, level: Level) {
self.integration.setup_logger(level);
}
}
impl<T> Daemon<T>
where
T: Notify,
{
#[instrument(skip_all)]
pub async fn service_handler<Srv>(
&self,
service: &Srv,
config: &ServiceConfig<Srv::Config, impl ConfigFormat<Data = Srv::Config>>,
policy: RestartPolicy,
) where
Srv: Service,
{
let mut receiver = config.receiver().clone();
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
let task = async {
use futures::FutureExt;
loop {
let service_config = receiver.borrow().clone();
self.integration.notify_ready();
info!("Starting service...");
futures::select! {
res = service.run(service_config).fuse() => {
match (&policy, res) {
(RestartPolicy::Always, Ok(())) | (RestartPolicy::OnSuccess, Ok(())) => {
info!("Service returned, restart due to policy '{policy:?}'.");
}
(RestartPolicy::Always, Err(err)) | (RestartPolicy::OnError, Err(err)) => {
warn!(
"Service returned with error, restart due to policy '{policy:?}'. Error was: '{err}'"
);
}
(RestartPolicy::Never, Ok(())) | (RestartPolicy::OnError, Ok(())) => {
info!("Service returned, stop due to policy '{policy:?}'.");
break;
}
(RestartPolicy::Never, Err(err)) | (RestartPolicy::OnSuccess, Err(err)) => {
warn!(
"Service returned with error, stop due to policy '{policy:?}'. Error was: '{err}'"
);
break;
}
}
}
res = receiver.changed().fuse() => {
res.expect("All senders are dropped. This is a bug.");
info!("Configuration changed. Restart service...");
}
}
}
};
#[cfg(feature = "tokio")]
let task = async {
loop {
let service_config = receiver.borrow_and_update().clone();
self.integration.notify_ready();
info!("Starting service...");
tokio::select! {
res = service.run(service_config) => {
match (&policy, res) {
(RestartPolicy::Always, Ok(())) | (RestartPolicy::OnSuccess, Ok(())) => {
info!("Service returned, restart due to policy '{policy:?}'.");
},
(RestartPolicy::Always, Err(err)) | (RestartPolicy::OnError, Err(err)) => {
warn!("Service returned with error, restart due to policy '{policy:?}'. Error was: '{err}'");
},
(RestartPolicy::Never, Ok(())) | (RestartPolicy::OnError, Ok(())) => {
info!("Service returned, stop due to policy '{policy:?}'.");
break;
},
(RestartPolicy::Never, Err(err)) | (RestartPolicy::OnSuccess, Err(err)) => {
warn!("Service returned with error, stop due to policy '{policy:?}'. Error was: '{err}'");
break;
},
}
},
res = receiver.changed() => {
res.expect("All senders are dropped. This is a bug.");
info!("Configuration changed. Restart service...");
}
}
}
};
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
let outcome = self.shutdown.token().run(task).await.ok();
#[cfg(feature = "tokio")]
let outcome = self.shutdown.run_until_cancelled(task).await;
match outcome {
Some(()) => {
info!("Distribute shutdown event via cancellation token...");
self.integration.notify_stopping();
self.shutdown.cancel();
}
None => {
info!("Task was canceled via token. Shutting down...");
}
}
}
}
impl<T> Daemon<T>
where
T: Reload + Notify,
{
#[instrument(skip_all)]
pub async fn reload_handler<Data, Fmt>(
&self,
config: &ServiceConfig<Data, Fmt>,
policy: ReloadPolicy,
) where
Data: Default + DeserializeOwned + Serialize + Debug + Clone,
Fmt: ConfigFormat<Data = Data>,
{
let task = async {
loop {
info!("Waiting for reload event.");
self.integration.wait_for_reload().await;
info!(
"Received reload event, try to reload config from '{}'.",
config.file.display()
);
let data = match config.file_format.try_load(&config.file) {
Ok(data) => {
info!("Loaded config successfully.");
data
}
Err(err) => match policy {
ReloadPolicy::KeepOnError => {
warn!(
"Failed to load config, re-use current config due to policy '{policy:?}'. Error was: '{err}'"
);
config.get()
}
ReloadPolicy::DefaultOnError => {
warn!(
"Failed to load config, use default config due to policy '{policy:?}'. Error was: '{err}'"
);
Data::default()
}
ReloadPolicy::ShutdownOnError => {
warn!(
"Failed to load config, stop due to policy '{policy:?}'. Error was: '{err}'"
);
break;
}
},
};
info!("Send configuration '{data:?}' to service");
self.integration.notify_reloading();
config.config_sender.send(data).unwrap();
}
};
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
let outcome = self.shutdown.token().run(task).await.ok();
#[cfg(feature = "tokio")]
let outcome = self.shutdown.run_until_cancelled(task).await;
match outcome {
Some(()) => {
info!("Distribute shutdown event via cancellation token...");
self.integration.notify_stopping();
self.shutdown.cancel();
}
None => {
info!("Task was canceled via token. Shutting down...");
}
}
}
}
impl<T> Daemon<T>
where
T: Watchdog,
{
#[instrument(skip_all)]
pub async fn watchdog_handler(&self) {
let task: Pin<Box<dyn Future<Output = ()>>> = match self.integration.notify_interval() {
None => {
info!("Watchdog disabled, do nothing.");
Box::pin(pending())
}
Some(trigger_interval) => {
let used_interval = (trigger_interval / 2) - Duration::from_micros(10);
info!(
"Watchdog enabled, received interval is {trigger_interval:?}, used interval is {used_interval:?}."
);
let task = async move {
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
let mut interval = async_timer::Interval::platform_new(used_interval);
#[cfg(feature = "tokio")]
let mut interval = tokio::time::interval(used_interval);
loop {
self.integration.notify();
debug!("Watchdog triggered.");
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
interval.as_mut().await;
#[cfg(feature = "tokio")]
interval.tick().await;
}
};
Box::pin(task)
}
};
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
let outcome = self.shutdown.token().run(task).await.ok();
#[cfg(feature = "tokio")]
let outcome = self.shutdown.run_until_cancelled(task).await;
match outcome {
Some(()) => {
panic!("This task must never terminate on its own. This is a bug");
}
None => {
info!("Task was canceled via token. Shutting down...");
}
}
}
}
impl<T> Daemon<T>
where
T: Shutdown + Notify,
{
#[instrument(skip_all)]
pub async fn shutdown_handler(&self) {
info!("Waiting for shutdown event.");
#[cfg(all(not(feature = "tokio"), feature = "agnostic"))]
let outcome = self
.shutdown
.token()
.run(self.integration.wait_for_shutdown())
.await
.ok();
#[cfg(feature = "tokio")]
let outcome = self
.shutdown
.run_until_cancelled(self.integration.wait_for_shutdown())
.await;
match outcome {
Some(()) => {
info!(
"Received shutdown event. Distribute shutdown event via cancellation token..."
);
self.integration.notify_stopping();
self.shutdown.cancel();
}
None => {
info!("Task was canceled via token. Shutting down...");
}
}
}
}
pub struct ServiceConfig<Data, Format> {
file: PathBuf,
file_format: Format,
config_sender: Sender<Data>,
config_receiver: Receiver<Data>,
}
impl<Data, Format> ServiceConfig<Data, Format> {
pub fn sender(&self) -> &Sender<Data> {
&self.config_sender
}
pub fn receiver(&self) -> &Receiver<Data> {
&self.config_receiver
}
pub fn receiver_mut(&mut self) -> &mut Receiver<Data> {
&mut self.config_receiver
}
}
impl<Data, Format> ServiceConfig<Data, Format>
where
Data: Clone,
{
pub fn get(&self) -> Data {
self.config_receiver.borrow().clone()
}
}
impl<Data, Format> ServiceConfig<Data, Format>
where
Data: Default + Serialize + DeserializeOwned + Debug,
Format: ConfigFormat<Data = Data>,
{
#[instrument(skip_all)]
pub fn try_load(file: &Path, file_format: Format) -> Result<Self, LoadError> {
let data = Self::try_load_data(file, &file_format)?;
let (config_sender, config_receiver) = channel(data);
Ok(Self {
file: file.to_path_buf(),
file_format,
config_sender,
config_receiver,
})
}
#[instrument(skip_all)]
pub fn try_load_or_default(file: &Path, file_format: Format) -> Self {
let data = Self::try_load_data(file, &file_format).unwrap_or_else(|_| {
let data = Data::default();
warn!("Failed to load config. Use default configuration instead. Config is {data:?}.");
data
});
let (config_sender, config_receiver) = channel(data);
Self {
file: file.to_path_buf(),
file_format,
config_sender,
config_receiver,
}
}
#[instrument(skip_all)]
fn try_load_data(file: &Path, file_format: &Format) -> Result<Data, LoadError> {
info!("Trying to load configuration from '{}'.", file.display());
match file_format.try_load(file) {
Ok(data) => {
info!("Loaded config successfully. Config is {data:?}.");
Ok(data)
}
Err(LoadError::Deserialize(err)) => {
error!("Failed to deserialize config. Error was: '{err}'");
Err(LoadError::from(err))
}
Err(LoadError::Io(err)) => {
warn!("Failed to load config from IO error. Error was: '{err}'.");
if let ErrorKind::NotFound = err.kind() {
info!("Config file was not found. Try to store default at given location.");
let data = Data::default();
if file_format.try_save(file, &data).is_err() {
error!("Failed to store default config.");
Err(LoadError::from(err))
} else {
info!("Default config successfully created.");
Ok(data)
}
} else {
error!("Failed to load config. Error was: '{err}'");
Err(LoadError::from(err))
}
}
}
}
}
impl<Data, Format> AsRef<Path> for ServiceConfig<Data, Format> {
fn as_ref(&self) -> &Path {
self.file.as_ref()
}
}
#[cfg(feature = "json")]
impl<Data> ServiceConfig<Data, config_format::Json<Data>>
where
Data: Default + Serialize + DeserializeOwned + Debug,
{
#[instrument(skip_all)]
pub fn try_load_json(file: &Path) -> Result<Self, LoadError> {
Self::try_load(file, config_format::Json::<Data>::default())
}
#[instrument(skip_all)]
pub fn try_load_json_or_default(file: &Path) -> Self {
Self::try_load_or_default(file, config_format::Json::<Data>::default())
}
}
#[cfg(feature = "toml")]
impl<Data> ServiceConfig<Data, config_format::Toml<Data>>
where
Data: Default + Serialize + DeserializeOwned + Debug,
{
#[instrument(skip_all)]
pub fn try_load_toml(file: &Path) -> Result<Self, LoadError> {
Self::try_load(file, config_format::Toml::<Data>::default())
}
#[instrument(skip_all)]
pub fn try_load_toml_or_default(file: &Path) -> Self {
Self::try_load_or_default(file, config_format::Toml::<Data>::default())
}
}
#[cfg(feature = "yaml")]
impl<Data> ServiceConfig<Data, config_format::Yaml<Data>>
where
Data: Default + Serialize + DeserializeOwned + Debug,
{
#[instrument(skip_all)]
pub fn try_load_yaml(file: &Path) -> Result<Self, LoadError> {
Self::try_load(file, config_format::Yaml::<Data>::default())
}
#[instrument(skip_all)]
pub fn try_load_yaml_or_default(file: &Path) -> Self {
Self::try_load_or_default(file, config_format::Yaml::<Data>::default())
}
}