use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::{JoinHandle, Scope, ScopedJoinHandle};
use signal_hook::consts::TERM_SIGNALS;
use signal_hook::flag;
use crate::settings::Settings;
use crate::{Error, Worker};
#[inline]
pub(crate) fn enable_graceful_shutdown(shutdown: &Shutdown) {
for sig in TERM_SIGNALS {
let _ = flag::register_conditional_shutdown(*sig, 1, shutdown.as_ref().clone());
let _ = flag::register(*sig, shutdown.as_ref().clone());
}
}
pub(crate) fn spawn_thread<W, C>(
mut worker: W,
settings: Settings,
cores: Option<C>,
shutdown: &Shutdown,
) -> Result<JoinHandle<()>, Error>
where
W: Worker + 'static,
C: AsRef<[usize]> + Send + 'static,
{
let shutdown = shutdown.clone();
settings
.into_inner()
.spawn(move || {
if let Some(cores) = cores.as_ref() {
let _ = affinity::set_thread_affinity(cores);
}
worker.run(shutdown);
})
.map_err(Error::thread)
}
pub(crate) fn spawn_scoped_thread<'scope, 'env, W, C>(
scope: &'scope Scope<'scope, 'env>,
mut worker: W,
settings: Settings,
cores: Option<C>,
shutdown: &Shutdown,
) -> Result<ScopedJoinHandle<'scope, ()>, Error>
where
W: Worker + 'env,
C: AsRef<[usize]> + Send + 'env,
{
let shutdown = shutdown.clone();
settings
.into_inner()
.spawn_scoped(scope, move || {
if let Some(cores) = cores.as_ref() {
let _ = affinity::set_thread_affinity(cores);
}
worker.run(shutdown);
})
.map_err(Error::thread)
}
#[derive(Debug, Default)]
pub struct Shutdown(Arc<AtomicBool>);
impl Shutdown {
#[inline]
pub(crate) fn new() -> Self {
Self::default()
}
#[inline]
pub fn stop(&self) {
self.0.store(true, Ordering::SeqCst)
}
#[inline]
pub fn is_running(&self) -> bool {
!self.0.load(Ordering::SeqCst)
}
}
impl AsRef<Arc<AtomicBool>> for Shutdown {
#[inline]
fn as_ref(&self) -> &Arc<AtomicBool> {
&self.0
}
}
impl Clone for Shutdown {
#[inline]
fn clone(&self) -> Self {
Self(self.0.clone())
}
}