use super::{CancellationToken, Runtime, RuntimeConfig};
use futures::Future;
use once_cell::sync::OnceCell;
use parking_lot::Mutex;
use std::time::Duration;
use tokio::{signal, task::JoinHandle};
static RT: OnceCell<tokio::runtime::Runtime> = OnceCell::new();
static RTHANDLE: OnceCell<tokio::runtime::Handle> = OnceCell::new();
static INIT: OnceCell<Mutex<Option<tokio::task::JoinHandle<anyhow::Result<()>>>>> = OnceCell::new();
const SHUTDOWN_MESSAGE: &str =
"Application received shutdown signal; attempting to gracefully shutdown";
const SHUTDOWN_TIMEOUT_MESSAGE: &str =
"Use DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT to control the graceful shutdown timeout";
pub const DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT: &str = "DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT";
pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG: u64 = 5;
pub const DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE: u64 = 30;
#[derive(Debug, Clone)]
pub struct Worker {
runtime: Runtime,
config: RuntimeConfig,
}
impl Worker {
pub fn from_settings() -> anyhow::Result<Worker> {
let config = RuntimeConfig::from_settings()?;
Worker::from_config(config)
}
pub fn from_config(config: RuntimeConfig) -> anyhow::Result<Worker> {
if RT.get().is_some() || RTHANDLE.get().is_some() {
return Err(anyhow::anyhow!("Worker already initialized"));
}
let rt = RT.try_insert(config.create_runtime()?).map_err(|_| {
anyhow::anyhow!("Failed to create worker; Only a single Worker should ever be created")
})?;
let runtime = Runtime::from_handle(rt.handle().clone())?;
Ok(Worker { runtime, config })
}
pub fn runtime_from_existing() -> anyhow::Result<Runtime> {
if let Some(rt) = RT.get() {
Ok(Runtime::from_handle(rt.handle().clone())?)
} else if let Some(rt) = RTHANDLE.get() {
Ok(Runtime::from_handle(rt.clone())?)
} else {
Runtime::from_settings()
}
}
pub fn tokio_runtime(&self) -> anyhow::Result<&'static tokio::runtime::Runtime> {
RT.get()
.ok_or_else(|| anyhow::anyhow!("Worker not initialized"))
}
pub fn runtime(&self) -> &Runtime {
&self.runtime
}
pub fn execute<F, Fut>(self, f: F) -> anyhow::Result<()>
where
F: FnOnce(Runtime) -> Fut + Send + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let runtime = self.runtime.clone();
runtime.secondary().block_on(self.execute_internal(f))??;
runtime.shutdown();
Ok(())
}
pub async fn execute_async<F, Fut>(self, f: F) -> anyhow::Result<()>
where
F: FnOnce(Runtime) -> Fut + Send + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let runtime = self.runtime.clone();
let task = self.execute_internal(f);
task.await??;
runtime.shutdown();
Ok(())
}
fn execute_internal<F, Fut>(self, f: F) -> JoinHandle<anyhow::Result<()>>
where
F: FnOnce(Runtime) -> Fut + Send + 'static,
Fut: Future<Output = anyhow::Result<()>> + Send + 'static,
{
let runtime = self.runtime.clone();
let primary = runtime.primary();
let secondary = runtime.secondary();
let timeout = std::env::var(DYN_WORKER_GRACEFUL_SHUTDOWN_TIMEOUT)
.ok()
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or({
if cfg!(debug_assertions) {
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_DEBUG
} else {
DEFAULT_GRACEFUL_SHUTDOWN_TIMEOUT_RELEASE
}
});
INIT.set(Mutex::new(Some(secondary.spawn(async move {
tokio::spawn(signal_handler(runtime.primary_token().clone()));
let cancel_token = runtime.child_token();
let (mut app_tx, app_rx) = tokio::sync::oneshot::channel::<()>();
let task: JoinHandle<anyhow::Result<()>> = primary.spawn(async move {
let _rx = app_rx;
f(runtime).await
});
tokio::select! {
_ = cancel_token.cancelled() => {
tracing::debug!("{}", SHUTDOWN_MESSAGE);
tracing::debug!("{} {} seconds", SHUTDOWN_TIMEOUT_MESSAGE, timeout);
}
_ = app_tx.closed() => {
}
};
let result = tokio::select! {
result = task => {
result
}
_ = tokio::time::sleep(tokio::time::Duration::from_secs(timeout)) => {
tracing::debug!("Application did not shutdown in time; terminating");
std::process::exit(911);
}
}?;
match &result {
Ok(_) => {
tracing::debug!("Application shutdown successfully");
}
Err(e) => {
tracing::error!("Application shutdown with error: {:?}", e);
}
}
result
}))))
.expect("Failed to spawn application task");
INIT
.get()
.expect("Application task not initialized")
.lock()
.take()
.expect("Application initialized; but another thread is awaiting it; Worker.execute() can only be called once")
}
pub fn from_current() -> anyhow::Result<Worker> {
if RT.get().is_some() || RTHANDLE.get().is_some() {
return Err(anyhow::anyhow!("Worker already initialized"));
}
let runtime = Runtime::from_current()?;
let config = RuntimeConfig::from_settings()?;
Ok(Worker { runtime, config })
}
}
async fn signal_handler(cancel_token: CancellationToken) -> anyhow::Result<()> {
let ctrl_c = async {
signal::ctrl_c().await?;
anyhow::Ok(())
};
let sigterm = async {
signal::unix::signal(signal::unix::SignalKind::terminate())?
.recv()
.await;
anyhow::Ok(())
};
tokio::select! {
_ = ctrl_c => {
tracing::info!("Ctrl+C received, starting graceful shutdown");
},
_ = sigterm => {
tracing::info!("SIGTERM received, starting graceful shutdown");
},
_ = cancel_token.cancelled() => {
tracing::debug!("CancellationToken triggered; shutting down");
},
}
cancel_token.cancel();
Ok(())
}