pub mod configuration;
#[cfg(unix)]
mod daemon;
#[cfg(unix)]
pub(crate) mod transfer_fd;
#[cfg(unix)]
use daemon::daemonize;
use log::{debug, error, info, warn};
use pingora_runtime::Runtime;
use pingora_timeout::fast_timeout;
#[cfg(feature = "sentry")]
use sentry::ClientOptions;
use std::sync::Arc;
use std::thread;
#[cfg(unix)]
use tokio::signal::unix;
use tokio::sync::{watch, Mutex};
use tokio::time::{sleep, Duration};
use crate::services::Service;
use configuration::{Opt, ServerConf};
#[cfg(unix)]
pub use transfer_fd::Fds;
use pingora_error::{Error, ErrorType, Result};
const EXIT_TIMEOUT: u64 = 60 * 5;
const CLOSE_TIMEOUT: u64 = 5;
enum ShutdownType {
Graceful,
Quick,
}
pub type ShutdownWatch = watch::Receiver<bool>;
#[cfg(unix)]
pub type ListenFds = Arc<Mutex<Fds>>;
pub struct Server {
services: Vec<Box<dyn Service>>,
#[cfg(unix)]
listen_fds: Option<ListenFds>,
shutdown_watch: watch::Sender<bool>,
shutdown_recv: ShutdownWatch,
pub configuration: Arc<ServerConf>,
pub options: Option<Opt>,
#[cfg(feature = "sentry")]
#[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
pub sentry: Option<ClientOptions>,
}
impl Server {
#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
ShutdownType::Quick
},
_ = graceful_terminate_signal.recv() => {
info!("SIGTERM received, gracefully exiting");
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
info!("SIGQUIT received, sending socks and gracefully exiting");
if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
match fds.send_to_sock(
self.configuration.as_ref().upgrade_sock.as_str())
{
Ok(_) => {info!("listener sockets sent");},
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry::capture_error(&e);
}
}
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => { info!("Graceful shutdown started!"); }
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
return ShutdownType::Graceful;
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
} else {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
},
}
}
fn run_service(
mut service: Box<dyn Service>,
#[cfg(unix)] fds: Option<ListenFds>,
shutdown: ShutdownWatch,
threads: usize,
work_stealing: bool,
) -> Runtime
{
let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
service_runtime.get_handle().spawn(async move {
service
.start_service(
#[cfg(unix)]
fds,
shutdown,
)
.await;
info!("service exited.")
});
service_runtime
}
#[cfg(unix)]
fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
let mut fds = Fds::new();
if upgrade {
debug!("Trying to receive socks");
fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
}
self.listen_fds = Some(Arc::new(Mutex::new(fds)));
Ok(())
}
pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
let opt = raw_opt.into();
if let Some(opts) = &opt {
if let Some(c) = opts.conf.as_ref() {
warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
}
conf.merge_with_opt(opts);
}
let (tx, rx) = watch::channel(false);
Server {
services: vec![],
#[cfg(unix)]
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
configuration: Arc::new(conf),
options: opt,
#[cfg(feature = "sentry")]
sentry: None,
}
}
pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
let opt = opt.into();
let (tx, rx) = watch::channel(false);
let conf = if let Some(opt) = opt.as_ref() {
opt.conf.as_ref().map_or_else(
|| {
ServerConf::new_with_opt_override(opt).ok_or_else(|| {
Error::explain(ErrorType::ReadError, "Conf generation failed")
})
},
|_| {
ServerConf::load_yaml_with_opt_override(opt)
},
)
} else {
ServerConf::new()
.ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
}?;
Ok(Server {
services: vec![],
#[cfg(unix)]
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
configuration: Arc::new(conf),
options: opt,
#[cfg(feature = "sentry")]
sentry: None,
})
}
pub fn add_service(&mut self, service: impl Service + 'static) {
self.services.push(Box::new(service));
}
pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
self.services.extend(services);
}
pub fn bootstrap(&mut self) {
info!("Bootstrap starting");
debug!("{:#?}", self.options);
#[cfg(all(not(debug_assertions), feature = "sentry"))]
let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
if self.options.as_ref().map_or(false, |o| o.test) {
info!("Server Test passed, exiting");
std::process::exit(0);
}
#[cfg(unix)]
match self.load_fds(self.options.as_ref().map_or(false, |o| o.upgrade)) {
Ok(_) => {
info!("Bootstrap done");
}
Err(e) => {
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry::capture_error(&e);
error!("Bootstrap failed on error: {:?}, exiting.", e);
std::process::exit(1);
}
}
}
pub fn run_forever(mut self) -> ! {
info!("Server starting");
let conf = self.configuration.as_ref();
#[cfg(unix)]
if conf.daemon {
info!("Daemonizing the server");
fast_timeout::pause_for_fork();
daemonize(&self.configuration);
fast_timeout::unpause();
}
#[cfg(windows)]
if conf.daemon {
panic!("Daemonizing under windows is not supported");
}
#[cfg(all(not(debug_assertions), feature = "sentry"))]
let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
let mut runtimes: Vec<Runtime> = Vec::new();
while let Some(service) = self.services.pop() {
let threads = service.threads().unwrap_or(conf.threads);
let runtime = Server::run_service(
service,
#[cfg(unix)]
self.listen_fds.clone(),
self.shutdown_recv.clone(),
threads,
conf.work_stealing,
);
runtimes.push(runtime);
}
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime.get_handle().block_on(self.main_loop());
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;
if matches!(shutdown_type, ShutdownType::Graceful) {
let exit_timeout = self
.configuration
.as_ref()
.grace_period_seconds
.unwrap_or(EXIT_TIMEOUT);
info!("Graceful shutdown: grace period {}s starts", exit_timeout);
thread::sleep(Duration::from_secs(exit_timeout));
info!("Graceful shutdown: grace period ends");
}
let shutdown_timeout = match shutdown_type {
ShutdownType::Quick => Duration::from_secs(0),
ShutdownType::Graceful => Duration::from_secs(
self.configuration
.as_ref()
.graceful_shutdown_timeout_seconds
.unwrap_or(5),
),
};
let shutdowns: Vec<_> = runtimes
.into_iter()
.map(|rt| {
info!("Waiting for runtimes to exit!");
thread::spawn(move || {
rt.shutdown_timeout(shutdown_timeout);
thread::sleep(shutdown_timeout)
})
})
.collect();
for shutdown in shutdowns {
if let Err(e) = shutdown.join() {
error!("Failed to shutdown runtime: {:?}", e);
}
}
info!("All runtimes exited, exiting now");
std::process::exit(0)
}
fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
if work_steal {
Runtime::new_steal(threads, name)
} else {
Runtime::new_no_steal(threads, name)
}
}
}