use std::any::{Any, TypeId};
use std::collections::{HashMap, HashSet};
use std::marker::PhantomData;
use std::mem;
use std::panic::{self, AssertUnwindSafe};
use std::path::{Path, PathBuf};
use std::process;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use arc_swap::{ArcSwap, Lease};
use failure::{Error, Fail, ResultExt};
use log::{debug, error, info, trace};
use parking_lot::Mutex;
use serde::de::DeserializeOwned;
use signal_hook::iterator::Signals;
use structopt::StructOpt;
use crate::app::App;
use crate::bodies::{InnerBody, SpiritBody, WrapBody, Wrapper};
use crate::cfg_loader::{Builder as CfgBuilder, ConfigBuilder, Loader as CfgLoader};
use crate::empty::Empty;
use crate::extension::{Extensible, Extension};
use crate::fragment::pipeline::MultiError;
use crate::utils;
use crate::validation::Action;
#[derive(Debug, Fail)]
#[fail(
display = "Config validation failed with {} errors from {} validators",
_0, _1
)]
pub struct ValidationError(usize, usize);
struct Hooks<O, C> {
config: Vec<Box<dyn FnMut(&O, &Arc<C>) + Send>>,
config_loader: CfgLoader,
config_mutators: Vec<Box<dyn FnMut(&mut C) + Send>>,
config_validators: Vec<Box<dyn FnMut(&Arc<C>, &Arc<C>, &O) -> Result<Action, Error> + Send>>,
sigs: HashMap<libc::c_int, Vec<Box<dyn FnMut() + Send>>>,
singletons: HashSet<TypeId>,
terminate: Vec<Box<dyn FnMut() + Send>>,
guards: Vec<Box<dyn Any + Send>>,
terminated: bool,
}
impl<O, C> Default for Hooks<O, C> {
fn default() -> Self {
Hooks {
config: Vec::new(),
config_loader: CfgBuilder::new().build_no_opts(),
config_mutators: Vec::new(),
config_validators: Vec::new(),
sigs: HashMap::new(),
singletons: HashSet::new(),
terminate: Vec::new(),
guards: Vec::new(),
terminated: false,
}
}
}
pub struct Spirit<O = Empty, C = Empty> {
config: ArcSwap<C>,
hooks: Mutex<Hooks<O, C>>,
opts: O,
terminate: AtomicBool,
autojoin_bg_thread: AtomicBool,
signals: Option<Signals>,
bg_thread: Mutex<Option<JoinHandle<()>>>,
}
impl<O, C> Spirit<O, C>
where
C: DeserializeOwned + Send + Sync,
O: StructOpt,
{
#[allow(clippy::new_ret_no_self)]
pub fn new() -> Builder<O, C>
where
C: Default,
{
Spirit::with_initial_config(C::default())
}
pub fn with_initial_config(config: C) -> Builder<O, C> {
Builder {
autojoin_bg_thread: false,
before_bodies: Vec::new(),
before_config: Vec::new(),
body_wrappers: Vec::new(),
config,
config_loader: CfgBuilder::new(),
config_hooks: Vec::new(),
config_mutators: Vec::new(),
config_validators: Vec::new(),
opts: PhantomData,
sig_hooks: HashMap::new(),
singletons: HashSet::new(),
terminate_hooks: Vec::new(),
guards: Vec::new(),
}
}
pub fn cmd_opts(&self) -> &O {
&self.opts
}
pub fn config(&self) -> Lease<Arc<C>> {
self.config.lease()
}
pub fn config_reload(&self) -> Result<(), Error> {
let mut new = self.load_config().context("Failed to load configuration")?;
let mut hooks = self.hooks.lock();
debug!("Running {} config mutators", hooks.config_mutators.len());
for m in &mut hooks.config_mutators {
m(&mut new);
}
let new = Arc::new(new);
let old = self.config.load();
debug!(
"Running {} config validators",
hooks.config_validators.len()
);
let mut errors = 0;
let mut failed_validators = 0;
let mut actions = Vec::with_capacity(hooks.config_validators.len());
for v in hooks.config_validators.iter_mut() {
match v(&old, &new, &self.opts) {
Ok(ac) => actions.push(ac),
Err(e) => {
failed_validators += 1;
match e.downcast::<MultiError>() {
Ok(e) => {
error!("{}", e);
errors += e.errors.len();
for e in e.errors {
crate::log_error!(multi Error, e);
}
}
Err(e) => {
errors += 1;
crate::log_error!(multi Error, e);
}
}
}
}
}
if errors == 0 {
debug!("Validation successful, switching to new config");
for a in actions {
a.run(true);
}
} else {
debug!("Rolling back validation attempt");
for a in actions {
a.run(false);
}
return Err(ValidationError(errors, failed_validators).into());
}
self.config.store(Arc::clone(&new));
debug!("Running {} post-configuration hooks", hooks.config.len());
for hook in &mut hooks.config {
hook(&self.opts, &new);
}
debug!("Configuration reloaded");
Ok(())
}
pub fn is_terminated(&self) -> bool {
self.terminate.load(Ordering::Relaxed)
}
pub fn terminate(&self) {
debug!("Running termination hooks");
if let Some(signals) = &self.signals {
signals.close();
}
let mut hooks = self.hooks.lock();
let mut term_hooks = Vec::new();
mem::swap(&mut term_hooks, &mut hooks.terminate);
for hook in &mut term_hooks {
hook();
}
self.terminate.store(true, Ordering::Relaxed);
let mut guards = Vec::new();
mem::swap(&mut guards, &mut hooks.guards);
let mut singletons = HashSet::new();
mem::swap(&mut singletons, &mut hooks.singletons);
*hooks = Hooks::default();
hooks.guards = guards;
hooks.singletons = singletons;
hooks.terminated = true;
}
fn background(&self, signals: &Signals) {
debug!("Starting background processing");
for signal in signals.forever() {
debug!("Received signal {}", signal);
let term = match signal {
libc::SIGHUP => {
let _ = utils::log_errors(module_path!(), || self.config_reload());
false
}
libc::SIGTERM | libc::SIGINT | libc::SIGQUIT => {
self.terminate();
true
}
_ => false,
};
if let Some(hooks) = self.hooks.lock().sigs.get_mut(&signal) {
for hook in hooks {
hook();
}
}
if term {
break;
}
}
debug!("Terminating the background thread");
}
fn load_config(&self) -> Result<C, Error> {
self.hooks.lock().config_loader.load()
}
pub fn join_bg_thread(&self) {
if let Some(handle) = self.bg_thread.lock().take() {
handle
.join()
.expect("Spirit BG thread handles panics, shouldn't panic itself");
}
}
pub(crate) fn should_autojoin(&self) -> bool {
self.autojoin_bg_thread.load(Ordering::Relaxed)
}
}
impl<O, C> Extensible for &Arc<Spirit<O, C>>
where
C: DeserializeOwned + Send + Sync,
O: StructOpt,
{
type Opts = O;
type Config = C;
type Ok = Self;
const STARTED: bool = true;
fn before_config<F>(self, cback: F) -> Result<Self, Error>
where
F: FnOnce(&Self::Config, &Self::Opts) -> Result<(), Error> + Send + 'static,
{
trace!("Running just added before_config");
cback(&self.config(), self.cmd_opts())?;
Ok(self)
}
fn config_validator<F>(self, mut f: F) -> Result<Self, Error>
where
F: FnMut(&Arc<C>, &Arc<C>, &O) -> Result<Action, Error> + Send + 'static,
{
trace!("Adding config validator at runtime");
let mut hooks = self.hooks.lock();
if !hooks.terminated {
let cfg = Lease::into_upgrade(self.config());
f(&cfg, &cfg, self.cmd_opts())?.run(true);
hooks.config_validators.push(Box::new(f));
}
Ok(self)
}
fn config_mutator<F>(self, f: F) -> Self
where
F: FnMut(&mut C) + Send + 'static,
{
trace!("Adding config mutator at runtime");
let mut hooks = self.hooks.lock();
if !hooks.terminated {
hooks.config_mutators.push(Box::new(f));
}
self
}
fn on_config<F: FnMut(&O, &Arc<C>) + Send + 'static>(self, mut hook: F) -> Self {
trace!("Adding config hook at runtime");
let mut hooks = self.hooks.lock();
if !hooks.terminated {
hook(self.cmd_opts(), &Lease::upgrade(&self.config()));
hooks.config.push(Box::new(hook));
}
self
}
fn on_signal<F>(self, signal: libc::c_int, hook: F) -> Result<Self, Error>
where
F: FnMut() + Send + 'static,
{
let signals = self
.signals
.as_ref()
.expect("Signals thread disabled by caller");
trace!("Adding signal hook at runtime");
signals.add_signal(signal)?;
let mut hooks = self.hooks.lock();
if !hooks.terminated {
hooks
.sigs
.entry(signal)
.or_insert_with(Vec::new)
.push(Box::new(hook));
}
Ok(self)
}
fn on_terminate<F: FnOnce() + Send + 'static>(self, hook: F) -> Self {
trace!("Running termination hook at runtime");
let mut hook = Some(hook);
let mut hooks = self.hooks.lock();
if hooks.terminated {
(hooks.terminate.last_mut().unwrap())();
} else {
hooks.terminate.push(Box::new(move || {
(hook.take().expect("Termination hook called multiple times"))()
}));
}
self
}
fn with<E>(self, ext: E) -> Result<Self::Ok, Error>
where
E: Extension<Self>,
{
ext.apply(self)
}
fn singleton<T: 'static>(&mut self) -> bool {
self.hooks.lock().singletons.insert(TypeId::of::<T>())
}
fn run_before<B>(self, body: B) -> Result<Self, Error>
where
B: FnOnce(&Arc<Spirit<Self::Opts, Self::Config>>) -> Result<(), Error> + Send + 'static,
{
body(self).map(|()| self)
}
fn run_around<W>(self, _wrapper: W) -> Result<Self, Error>
where
W: FnOnce(&Arc<Spirit<Self::Opts, Self::Config>>, InnerBody) -> Result<(), Error>
+ Send
+ 'static,
{
panic!("Wrapping body while already running is not possible, move this to the builder (see https://docs.rs/spirit/*/extension/trait.Extensible.html#method.run_around");
}
fn with_singleton<T>(mut self, singleton: T) -> Result<Self, Error>
where
T: Extension<Self::Ok> + 'static,
{
if self.singleton::<T>() {
self.with(singleton)
} else {
Ok(self)
}
}
fn keep_guard<G: Any + Send>(self, guard: G) -> Self {
self.hooks.lock().guards.push(Box::new(guard));
self
}
fn autojoin_bg_thread(self) -> Self {
self.autojoin_bg_thread.store(true, Ordering::Relaxed);
self
}
}
#[must_use = "The builder is inactive without calling `run` or `build`"]
pub struct Builder<O = Empty, C = Empty> {
autojoin_bg_thread: bool,
before_bodies: Vec<SpiritBody<O, C>>,
before_config: Vec<Box<dyn FnMut(&C, &O) -> Result<(), Error> + Send>>,
body_wrappers: Vec<Wrapper<O, C>>,
config: C,
config_loader: CfgBuilder,
config_hooks: Vec<Box<dyn FnMut(&O, &Arc<C>) + Send>>,
config_mutators: Vec<Box<dyn FnMut(&mut C) + Send>>,
config_validators: Vec<Box<dyn FnMut(&Arc<C>, &Arc<C>, &O) -> Result<Action, Error> + Send>>,
opts: PhantomData<O>,
sig_hooks: HashMap<libc::c_int, Vec<Box<dyn FnMut() + Send>>>,
singletons: HashSet<TypeId>,
terminate_hooks: Vec<Box<dyn FnMut() + Send>>,
guards: Vec<Box<dyn Any + Send>>,
}
impl<O, C> Builder<O, C>
where
C: DeserializeOwned + Send + Sync + 'static,
O: StructOpt + Sync + Send + 'static,
{
pub fn config_loader(self, loader: CfgBuilder) -> Self {
Self {
config_loader: loader,
..self
}
}
}
impl<O, C> ConfigBuilder for Builder<O, C> {
fn config_default_paths<P, I>(self, paths: I) -> Self
where
I: IntoIterator<Item = P>,
P: Into<PathBuf>,
{
Self {
config_loader: self.config_loader.config_default_paths(paths),
..self
}
}
fn config_defaults<D: Into<String>>(self, config: D) -> Self {
Self {
config_loader: self.config_loader.config_defaults(config),
..self
}
}
fn config_env<E: Into<String>>(self, env: E) -> Self {
Self {
config_loader: self.config_loader.config_env(env),
..self
}
}
fn config_filter<F: FnMut(&Path) -> bool + Send + 'static>(self, filter: F) -> Self {
Self {
config_loader: self.config_loader.config_filter(filter),
..self
}
}
}
impl<O, C> Extensible for Builder<O, C> {
type Opts = O;
type Config = C;
type Ok = Self;
const STARTED: bool = false;
fn before_config<F>(mut self, cback: F) -> Result<Self, Error>
where
F: FnOnce(&C, &O) -> Result<(), Error> + Send + 'static,
{
let mut cback = Some(cback);
let cback = move |conf: &C, opts: &O| (cback.take().unwrap())(conf, opts);
self.before_config.push(Box::new(cback));
Ok(self)
}
fn config_validator<F>(self, f: F) -> Result<Self, Error>
where
F: FnMut(&Arc<C>, &Arc<C>, &O) -> Result<Action, Error> + Send + 'static,
{
let mut validators = self.config_validators;
validators.push(Box::new(f));
Ok(Self {
config_validators: validators,
..self
})
}
fn config_mutator<F>(self, f: F) -> Self
where
F: FnMut(&mut C) + Send + 'static,
{
let mut mutators = self.config_mutators;
mutators.push(Box::new(f));
Self {
config_mutators: mutators,
..self
}
}
fn on_config<F: FnMut(&O, &Arc<C>) + Send + 'static>(self, hook: F) -> Self {
let mut hooks = self.config_hooks;
hooks.push(Box::new(hook));
Self {
config_hooks: hooks,
..self
}
}
fn on_signal<F>(self, signal: libc::c_int, hook: F) -> Result<Self, Error>
where
F: FnMut() + Send + 'static,
{
let mut hooks = self.sig_hooks;
hooks
.entry(signal)
.or_insert_with(Vec::new)
.push(Box::new(hook));
Ok(Self {
sig_hooks: hooks,
..self
})
}
fn on_terminate<F: FnOnce() + Send + 'static>(self, hook: F) -> Self {
let mut hook = Some(hook);
let mut hooks = self.terminate_hooks;
hooks.push(Box::new(move || {
(hook.take().expect("Termination hook called more than once"))();
}));
Self {
terminate_hooks: hooks,
..self
}
}
fn with<E>(self, ext: E) -> Result<Self::Ok, Error>
where
E: Extension<Self>,
{
ext.apply(self)
}
fn singleton<T: 'static>(&mut self) -> bool {
self.singletons.insert(TypeId::of::<T>())
}
fn run_before<B>(mut self, body: B) -> Result<Self, Error>
where
B: FnOnce(&Arc<Spirit<O, C>>) -> Result<(), Error> + Send + 'static,
{
self.before_bodies.push(Box::new(Some(body)));
Ok(self)
}
fn run_around<W>(mut self, wrapper: W) -> Result<Self, Error>
where
W: FnOnce(&Arc<Spirit<O, C>>, InnerBody) -> Result<(), Error> + Send + 'static,
{
let wrapper = move |(spirit, inner): (&_, _)| wrapper(spirit, inner);
self.body_wrappers.push(Box::new(Some(wrapper)));
Ok(self)
}
fn with_singleton<T>(mut self, singleton: T) -> Result<Self, Error>
where
T: Extension<Self::Ok> + 'static,
{
if self.singleton::<T>() {
self.with(singleton)
} else {
Ok(self)
}
}
fn keep_guard<G: Any + Send>(mut self, guard: G) -> Self {
self.guards.push(Box::new(guard));
self
}
fn autojoin_bg_thread(self) -> Self {
Self {
autojoin_bg_thread: true,
..self
}
}
}
pub trait SpiritBuilder: ConfigBuilder + Extensible {
fn build(self, background_thread: bool) -> Result<App<Self::Opts, Self::Config>, Error>;
fn run<B>(self, body: B)
where
B: FnOnce(&Arc<Spirit<Self::Opts, Self::Config>>) -> Result<(), Error> + Send + 'static;
}
impl<O, C> SpiritBuilder for Builder<O, C>
where
Self::Config: DeserializeOwned + Send + Sync + 'static,
Self::Opts: StructOpt + Sync + Send + 'static,
{
fn build(mut self, background_thread: bool) -> Result<App<O, C>, Error> {
debug!("Building the spirit");
let (opts, loader) = self.config_loader.build::<Self::Opts>();
for before_config in &mut self.before_config {
before_config(&self.config, &opts).context("The before-config phase failed")?;
}
let interesting_signals = self
.sig_hooks
.keys()
.chain(&[libc::SIGHUP, libc::SIGTERM, libc::SIGQUIT, libc::SIGINT])
.cloned()
.collect::<HashSet<_>>();
let config = ArcSwap::from(Arc::from(self.config));
let signals = if background_thread {
Some(Signals::new(interesting_signals)?)
} else {
assert!(
self.sig_hooks.is_empty(),
"Registered signals; now starting without a signal thread",
);
None
};
let signals_spirit = signals.clone();
let spirit = Spirit {
autojoin_bg_thread: AtomicBool::new(self.autojoin_bg_thread),
config,
hooks: Mutex::new(Hooks {
config: self.config_hooks,
config_loader: loader,
config_mutators: self.config_mutators,
config_validators: self.config_validators,
sigs: self.sig_hooks,
singletons: self.singletons,
terminate: self.terminate_hooks,
terminated: false,
guards: self.guards,
}),
opts,
terminate: AtomicBool::new(false),
signals: signals_spirit,
bg_thread: Mutex::new(None),
};
spirit
.config_reload()
.context("Problem loading the initial configuration")?;
let spirit = Arc::new(spirit);
if background_thread {
let spirit_bg = Arc::clone(&spirit);
let handle = thread::Builder::new()
.name("spirit".to_owned())
.spawn(move || {
loop {
let run =
AssertUnwindSafe(|| spirit_bg.background(signals.as_ref().unwrap()));
if panic::catch_unwind(run).is_err() {
thread::sleep(Duration::from_secs(1));
info!("Restarting the spirit service thread after a panic");
} else {
break;
}
}
})
.unwrap();
*spirit.bg_thread.lock() = Some(handle);
}
debug!(
"Building bodies from {} before-bodies and {} wrappers",
self.before_bodies.len(),
self.body_wrappers.len()
);
let spirit_body = Arc::clone(&spirit);
let bodies = self.before_bodies;
let inner = move |()| {
for mut body in bodies {
body.run(&spirit_body)?;
}
Ok(())
};
let body_wrappers = self.body_wrappers;
let inner = InnerBody(Box::new(Some(inner)));
let spirit_body = Arc::clone(&spirit);
let mut wrapped = WrapBody(Box::new(Some(InnerBody::run)));
for mut wrapper in body_wrappers.into_iter().rev() {
let spirit = Arc::clone(&spirit_body);
let applied = move |inner: InnerBody| wrapper.run((&spirit, inner));
wrapped = WrapBody(Box::new(Some(applied)));
}
Ok(App::new(spirit, inner, wrapped))
}
fn run<B: FnOnce(&Arc<Spirit<O, C>>) -> Result<(), Error> + Send + 'static>(self, body: B) {
Ok(self).run(body);
}
}
impl<O, C> SpiritBuilder for Result<Builder<O, C>, Error>
where
Self::Config: DeserializeOwned + Send + Sync + 'static,
Self::Opts: StructOpt + Sync + Send + 'static,
{
fn build(self, background_thread: bool) -> Result<App<O, C>, Error> {
self.and_then(|b| b.build(background_thread))
}
fn run<B: FnOnce(&Arc<Spirit<O, C>>) -> Result<(), Error> + Send + 'static>(self, body: B) {
let result = utils::log_errors("top-level", || {
let me = self?;
let app = me.build(true)?;
let spirit = Arc::clone(app.spirit());
let body = move || body(&spirit);
app.run(body)
});
if result.is_err() {
process::exit(1);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn _nonref_spirit_extensible() {
let app = Spirit::<Empty, Empty>::new().build(false).unwrap();
let spirit = Arc::clone(app.spirit());
spirit.on_terminate(|| ()).on_config(|_opts, _cfg| ());
}
}