mod bootstrap_services;
pub mod configuration;
#[cfg(unix)]
mod daemon;
#[cfg(unix)]
pub(crate) mod transfer_fd;
use async_trait::async_trait;
#[cfg(unix)]
use daemon::daemonize;
use daggy::NodeIndex;
use log::{debug, error, info, warn};
use parking_lot::Mutex;
use pingora_runtime::Runtime;
use pingora_timeout::fast_timeout;
#[cfg(feature = "sentry")]
use sentry::ClientOptions;
use std::sync::Arc;
use std::thread;
use std::time::SystemTime;
#[cfg(unix)]
use tokio::signal::unix;
use tokio::sync::{broadcast, watch, Mutex as TokioMutex};
use tokio::time::{sleep, Duration};
use crate::prelude::background_service;
use crate::server::bootstrap_services::{Bootstrap, BootstrapService, SentryInitService};
use crate::services::{
DependencyGraph, ServiceHandle, ServiceReadyNotifier, ServiceReadyWatch, ServiceWithDependents,
};
use configuration::{Opt, ServerConf};
use std::collections::HashMap;
#[cfg(unix)]
pub use transfer_fd::Fds;
use pingora_error::{Error, ErrorType, Result};
const EXIT_TIMEOUT: u64 = 60 * 5;
const CLOSE_TIMEOUT: u64 = 5;
enum ShutdownType {
Graceful,
Quick,
}
pub(crate) struct ServiceWrapper {
ready_notifier: Option<ServiceReadyNotifier>,
service: Box<dyn ServiceWithDependents>,
service_handle: ServiceHandle,
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum ExecutionPhase {
Setup,
Bootstrap,
BootstrapComplete,
Running,
GracefulUpgradeTransferringFds,
GracefulUpgradeCloseTimeout,
GracefulTerminate,
ShutdownStarted,
ShutdownGracePeriod,
ShutdownRuntimes,
Terminated,
}
pub type ShutdownWatch = watch::Receiver<bool>;
#[cfg(unix)]
pub type ListenFds = Arc<TokioMutex<Fds>>;
#[derive(Debug)]
pub enum ShutdownSignal {
GracefulUpgrade,
GracefulTerminate,
FastShutdown,
}
#[async_trait]
pub trait ShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal;
}
#[cfg(unix)]
pub struct UnixShutdownSignalWatch;
#[cfg(unix)]
#[async_trait]
impl ShutdownSignalWatch for UnixShutdownSignalWatch {
async fn recv(&self) -> ShutdownSignal {
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
tokio::select! {
_ = graceful_upgrade_signal.recv() => {
ShutdownSignal::GracefulUpgrade
},
_ = graceful_terminate_signal.recv() => {
ShutdownSignal::GracefulTerminate
},
_ = fast_shutdown_signal.recv() => {
ShutdownSignal::FastShutdown
},
}
}
}
pub struct RunArgs {
#[cfg(unix)]
pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
}
impl Default for RunArgs {
#[cfg(unix)]
fn default() -> Self {
Self {
shutdown_signal: Box::new(UnixShutdownSignalWatch),
}
}
#[cfg(windows)]
fn default() -> Self {
Self {}
}
}
pub struct Server {
init_services: Vec<Box<dyn ServiceWithDependents + 'static>>,
services: HashMap<NodeIndex, ServiceWrapper>,
shutdown_watch: watch::Sender<bool>,
shutdown_recv: ShutdownWatch,
execution_phase_watch: broadcast::Sender<ExecutionPhase>,
dependencies: Arc<Mutex<DependencyGraph>>,
bootstrap: Arc<Mutex<Bootstrap>>,
pub configuration: Arc<ServerConf>,
pub options: Option<Opt>,
}
impl Server {
pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
self.execution_phase_watch.subscribe()
}
#[cfg(unix)]
async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
self.execution_phase_watch
.send(ExecutionPhase::Running)
.ok();
match run_args.shutdown_signal.recv().await {
ShutdownSignal::FastShutdown => {
info!("SIGINT received, exiting");
ShutdownType::Quick
}
ShutdownSignal::GracefulTerminate => {
info!("SIGTERM received, gracefully exiting");
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
self.execution_phase_watch
.send(ExecutionPhase::GracefulTerminate)
.ok();
ShutdownType::Graceful
}
ShutdownSignal::GracefulUpgrade => {
info!("SIGQUIT received, sending socks and gracefully exiting");
self.execution_phase_watch
.send(ExecutionPhase::GracefulUpgradeTransferringFds)
.ok();
if let Some(fds) = self.listen_fds() {
let fds = fds.lock().await;
info!("Trying to send socks");
match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
Ok(_) => {
info!("listener sockets sent");
}
Err(e) => {
error!("Unable to send listener sockets to new process: {e}");
#[cfg(all(not(debug_assertions), feature = "sentry"))]
sentry::capture_error(&e);
}
}
self.execution_phase_watch
.send(ExecutionPhase::GracefulUpgradeCloseTimeout)
.ok();
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
return ShutdownType::Graceful;
}
}
info!("Broadcast graceful shutdown complete");
ShutdownType::Graceful
} else {
info!("No socks to send, shutting down.");
ShutdownType::Graceful
}
}
}
}
#[cfg(windows)]
async fn main_loop(&self, _run_args: RunArgs) -> ShutdownType {
self.execution_phase_watch
.send(ExecutionPhase::Running)
.ok();
match tokio::signal::ctrl_c().await {
Ok(()) => {
info!("Ctrl+C received, gracefully exiting");
info!("Broadcasting graceful shutdown");
match self.shutdown_watch.send(true) {
Ok(_) => {
info!("Graceful shutdown started!");
}
Err(e) => {
error!("Graceful shutdown broadcast failed: {e}");
}
}
info!("Broadcast graceful shutdown complete");
self.execution_phase_watch
.send(ExecutionPhase::GracefulTerminate)
.ok();
ShutdownType::Graceful
}
Err(e) => {
error!("Unable to listen for shutdown signal: {}", e);
ShutdownType::Quick
}
}
}
#[cfg(feature = "sentry")]
#[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
pub fn set_sentry_config(&mut self, sentry_config: ClientOptions) {
self.bootstrap.lock().set_sentry_config(Some(sentry_config));
}
#[cfg(unix)]
fn listen_fds(&self) -> Option<ListenFds> {
self.bootstrap.lock().get_fds()
}
#[allow(clippy::too_many_arguments)]
fn run_service(
mut service: Box<dyn ServiceWithDependents>,
#[cfg(unix)] fds: Option<ListenFds>,
shutdown: ShutdownWatch,
threads: usize,
work_stealing: bool,
listeners_per_fd: usize,
ready_notifier: ServiceReadyNotifier,
dependency_watches: Vec<ServiceReadyWatch>,
) -> Runtime
{
let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
let service_name = service.name().to_string();
service_runtime.get_handle().spawn(async move {
let mut time_waited_opt: Option<Duration> = None;
for mut watch in dependency_watches {
let start = SystemTime::now();
if watch.wait_for(|&ready| ready).await.is_err() {
error!(
"Service '{}' dependency channel closed before ready",
service_name
);
}
*time_waited_opt.get_or_insert_default() += start.elapsed().unwrap_or_default()
}
if let Some(time_waited) = time_waited_opt {
service.on_startup_delay(time_waited);
}
service
.start_service(
#[cfg(unix)]
fds,
shutdown,
listeners_per_fd,
ready_notifier,
)
.await;
info!("service '{}' exited.", service_name);
});
service_runtime
}
pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
let opt = raw_opt.into();
if let Some(opts) = &opt {
if let Some(c) = opts.conf.as_ref() {
warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
}
conf.merge_with_opt(opts);
}
let (tx, rx) = watch::channel(false);
let execution_phase_watch = broadcast::channel(100).0;
let bootstrap = Arc::new(Mutex::new(Bootstrap::new(
&opt,
&conf,
&execution_phase_watch,
)));
Server {
services: Default::default(),
init_services: Default::default(),
shutdown_watch: tx,
shutdown_recv: rx,
execution_phase_watch,
configuration: Arc::new(conf),
options: opt,
dependencies: Arc::new(Mutex::new(DependencyGraph::new())),
bootstrap,
}
}
pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
let opt = opt.into();
let (tx, rx) = watch::channel(false);
let execution_phase_watch = broadcast::channel(100).0;
let conf = if let Some(opt) = opt.as_ref() {
opt.conf.as_ref().map_or_else(
|| {
ServerConf::new_with_opt_override(opt).ok_or_else(|| {
Error::explain(ErrorType::ReadError, "Conf generation failed")
})
},
|_| {
ServerConf::load_yaml_with_opt_override(opt)
},
)
} else {
ServerConf::new()
.ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
}?;
let bootstrap = Arc::new(Mutex::new(Bootstrap::new(
&opt,
&conf,
&execution_phase_watch,
)));
Ok(Server {
services: Default::default(),
init_services: Default::default(),
shutdown_watch: tx,
shutdown_recv: rx,
execution_phase_watch,
configuration: Arc::new(conf),
options: opt,
dependencies: Arc::new(Mutex::new(DependencyGraph::new())),
bootstrap,
})
}
fn add_init_service(&mut self, service: impl ServiceWithDependents + 'static) {
let boxed_service = Box::new(service);
self.init_services.push(boxed_service);
}
fn apply_init_service_dependencies(&mut self) {
let services = self
.services
.values()
.map(|service| service.service_handle.clone())
.collect::<Vec<_>>();
let global_deps = self
.init_services
.drain(..)
.collect::<Vec<_>>()
.into_iter()
.map(|dep| self.add_boxed_service(dep))
.collect::<Vec<_>>();
for service in services {
service.add_dependencies(&global_deps);
}
}
pub fn add_service(&mut self, service: impl ServiceWithDependents + 'static) -> ServiceHandle {
self.add_boxed_service(Box::new(service))
}
pub fn add_boxed_service(
&mut self,
service_box: Box<dyn ServiceWithDependents>,
) -> ServiceHandle {
let name = service_box.name().to_string();
let (tx, rx) = watch::channel(false);
let id = self.dependencies.lock().add_node(name.clone(), rx.clone());
let service_handle = ServiceHandle::new(id, name, rx, &self.dependencies);
let wrapper = ServiceWrapper {
ready_notifier: Some(ServiceReadyNotifier::new(tx)),
service: service_box,
service_handle: service_handle.clone(),
};
self.services.insert(id, wrapper);
service_handle
}
pub fn add_services(
&mut self,
services: Vec<Box<dyn ServiceWithDependents>>,
) -> Vec<ServiceHandle> {
services
.into_iter()
.map(|service| self.add_boxed_service(service))
.collect()
}
pub fn bootstrap(&mut self) {
self.bootstrap.lock().bootstrap();
}
pub fn bootstrap_as_a_service(&mut self) -> ServiceHandle {
let bootstrap_service =
background_service("Bootstrap Service", BootstrapService::new(&self.bootstrap));
let sentry_service = background_service(
"Sentry Init Service",
SentryInitService::new(&self.bootstrap),
);
self.add_init_service(sentry_service);
self.add_service(bootstrap_service)
}
pub fn run_forever(self) -> ! {
self.run(RunArgs::default());
std::process::exit(0)
}
pub fn run(mut self, run_args: RunArgs) {
self.apply_init_service_dependencies();
info!("Server starting");
let conf = self.configuration.as_ref();
#[cfg(unix)]
if conf.daemon {
info!("Daemonizing the server");
fast_timeout::pause_for_fork();
daemonize(&self.configuration);
fast_timeout::unpause();
}
#[cfg(windows)]
if conf.daemon {
panic!("Daemonizing under windows is not supported");
}
let mut runtimes: Vec<(Runtime, String)> = Vec::new();
let startup_order = match self.dependencies.lock().topological_sort() {
Ok(order) => order,
Err(e) => {
error!("Failed to determine service startup order: {}", e);
std::process::exit(1);
}
};
let service_names: Vec<String> = startup_order
.iter()
.map(|(_, service)| service.name.clone())
.collect();
info!("Starting services in dependency order: {:?}", service_names);
for (service_id, service) in startup_order {
let mut wrapper = match self.services.remove(&service_id) {
Some(w) => w,
None => {
warn!(
"Service ID {:?}-{} in startup order but not found",
service_id, service.name
);
continue;
}
};
let threads = wrapper.service.threads().unwrap_or(conf.threads);
let name = wrapper.service.name().to_string();
let dependencies = self
.dependencies
.lock()
.get_dependencies(wrapper.service_handle.id);
let ready_notifier = wrapper
.ready_notifier
.take()
.expect("Service notifier should exist");
if !dependencies.is_empty() {
info!(
"Service '{name}' will wait for dependencies: {:?}",
dependencies.iter().map(|s| &s.name).collect::<Vec<_>>()
);
} else {
info!("Starting service: {}", name);
}
let dependency_watches = dependencies
.iter()
.map(|s| s.ready_watch.clone())
.collect::<Vec<_>>();
let runtime = Server::run_service(
wrapper.service,
#[cfg(unix)]
self.listen_fds(),
self.shutdown_recv.clone(),
threads,
conf.work_stealing,
self.configuration.listener_tasks_per_fd,
ready_notifier,
dependency_watches,
);
runtimes.push((runtime, name));
}
let server_runtime = Server::create_runtime("Server", 1, true);
#[cfg(unix)]
let shutdown_type = server_runtime
.get_handle()
.block_on(self.main_loop(run_args));
#[cfg(windows)]
let shutdown_type = server_runtime
.get_handle()
.block_on(self.main_loop(run_args));
self.execution_phase_watch
.send(ExecutionPhase::ShutdownStarted)
.ok();
if matches!(shutdown_type, ShutdownType::Graceful) {
self.execution_phase_watch
.send(ExecutionPhase::ShutdownGracePeriod)
.ok();
let exit_timeout = self
.configuration
.as_ref()
.grace_period_seconds
.unwrap_or(EXIT_TIMEOUT);
info!("Graceful shutdown: grace period {}s starts", exit_timeout);
thread::sleep(Duration::from_secs(exit_timeout));
info!("Graceful shutdown: grace period ends");
}
let shutdown_timeout = match shutdown_type {
ShutdownType::Quick => Duration::from_secs(0),
ShutdownType::Graceful => Duration::from_secs(
self.configuration
.as_ref()
.graceful_shutdown_timeout_seconds
.unwrap_or(5),
),
};
self.execution_phase_watch
.send(ExecutionPhase::ShutdownRuntimes)
.ok();
let shutdowns: Vec<_> = runtimes
.into_iter()
.map(|(rt, name)| {
info!("Waiting for runtimes to exit!");
let join = thread::spawn(move || {
rt.shutdown_timeout(shutdown_timeout);
thread::sleep(shutdown_timeout)
});
(join, name)
})
.collect();
for (shutdown, name) in shutdowns {
info!("Waiting for service runtime {} to exit", name);
if let Err(e) = shutdown.join() {
error!("Failed to shutdown service runtime {}: {:?}", name, e);
}
debug!("Service runtime {} has exited", name);
}
info!("All runtimes exited, exiting now");
self.execution_phase_watch
.send(ExecutionPhase::Terminated)
.ok();
}
fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
if work_steal {
Runtime::new_steal(threads, name)
} else {
Runtime::new_no_steal(threads, name)
}
}
}