1pub mod configuration;
18#[cfg(unix)]
19mod daemon;
20#[cfg(unix)]
21pub(crate) mod transfer_fd;
22
23use async_trait::async_trait;
24#[cfg(unix)]
25use daemon::daemonize;
26use log::{debug, error, info, warn};
27use pingora_runtime::Runtime;
28use pingora_timeout::fast_timeout;
29#[cfg(feature = "sentry")]
30use sentry::ClientOptions;
31use std::sync::Arc;
32use std::thread;
33#[cfg(unix)]
34use tokio::signal::unix;
35use tokio::sync::{watch, Mutex};
36use tokio::time::{sleep, Duration};
37
38use crate::services::Service;
39use configuration::{Opt, ServerConf};
40#[cfg(unix)]
41pub use transfer_fd::Fds;
42
43use pingora_error::{Error, ErrorType, Result};
44
45const EXIT_TIMEOUT: u64 = 60 * 5;
48const CLOSE_TIMEOUT: u64 = 5;
51
52enum ShutdownType {
53 Graceful,
54 Quick,
55}
56
57pub type ShutdownWatch = watch::Receiver<bool>;
60#[cfg(unix)]
61pub type ListenFds = Arc<Mutex<Fds>>;
62
63#[derive(Debug)]
65pub enum ShutdownSignal {
66 GracefulUpgrade,
69 GracefulTerminate,
72 FastShutdown,
74}
75
76#[async_trait]
79pub trait ShutdownSignalWatch {
80 async fn recv(&self) -> ShutdownSignal;
82}
83
84#[cfg(unix)]
90pub struct UnixShutdownSignalWatch;
91
92#[cfg(unix)]
93#[async_trait]
94impl ShutdownSignalWatch for UnixShutdownSignalWatch {
95 async fn recv(&self) -> ShutdownSignal {
96 let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
97 let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
98 let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
99
100 tokio::select! {
101 _ = graceful_upgrade_signal.recv() => {
102 ShutdownSignal::GracefulUpgrade
103 },
104 _ = graceful_terminate_signal.recv() => {
105 ShutdownSignal::GracefulTerminate
106 },
107 _ = fast_shutdown_signal.recv() => {
108 ShutdownSignal::FastShutdown
109 },
110 }
111 }
112}
113
114pub struct RunArgs {
116 #[cfg(unix)]
118 pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
119}
120
121impl Default for RunArgs {
122 #[cfg(unix)]
123 fn default() -> Self {
124 Self {
125 shutdown_signal: Box::new(UnixShutdownSignalWatch),
126 }
127 }
128
129 #[cfg(windows)]
130 fn default() -> Self {
131 Self {}
132 }
133}
134
135pub struct Server {
141 services: Vec<Box<dyn Service>>,
142 #[cfg(unix)]
143 listen_fds: Option<ListenFds>,
144 shutdown_watch: watch::Sender<bool>,
145 shutdown_recv: ShutdownWatch,
147 pub configuration: Arc<ServerConf>,
149 pub options: Option<Opt>,
151 #[cfg(feature = "sentry")]
152 #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
153 pub sentry: Option<ClientOptions>,
157}
158
159impl Server {
162 #[cfg(unix)]
163 async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
164 match run_args.shutdown_signal.recv().await {
166 ShutdownSignal::FastShutdown => {
167 info!("SIGINT received, exiting");
168 ShutdownType::Quick
169 }
170 ShutdownSignal::GracefulTerminate => {
171 info!("SIGTERM received, gracefully exiting");
173 info!("Broadcasting graceful shutdown");
175 match self.shutdown_watch.send(true) {
176 Ok(_) => {
177 info!("Graceful shutdown started!");
178 }
179 Err(e) => {
180 error!("Graceful shutdown broadcast failed: {e}");
181 }
182 }
183 info!("Broadcast graceful shutdown complete");
184 ShutdownType::Graceful
185 }
186 ShutdownSignal::GracefulUpgrade => {
187 info!("SIGQUIT received, sending socks and gracefully exiting");
190 if let Some(fds) = &self.listen_fds {
191 let fds = fds.lock().await;
192 info!("Trying to send socks");
193 match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
195 Ok(_) => {
196 info!("listener sockets sent");
197 }
198 Err(e) => {
199 error!("Unable to send listener sockets to new process: {e}");
200 #[cfg(all(not(debug_assertions), feature = "sentry"))]
202 sentry::capture_error(&e);
203 }
204 }
205 sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
206 info!("Broadcasting graceful shutdown");
207 match self.shutdown_watch.send(true) {
209 Ok(_) => {
210 info!("Graceful shutdown started!");
211 }
212 Err(e) => {
213 error!("Graceful shutdown broadcast failed: {e}");
214 return ShutdownType::Graceful;
216 }
217 }
218 info!("Broadcast graceful shutdown complete");
219 ShutdownType::Graceful
220 } else {
221 info!("No socks to send, shutting down.");
222 ShutdownType::Graceful
223 }
224 }
225 }
226 }
227
228 fn run_service(
229 mut service: Box<dyn Service>,
230 #[cfg(unix)] fds: Option<ListenFds>,
231 shutdown: ShutdownWatch,
232 threads: usize,
233 work_stealing: bool,
234 listeners_per_fd: usize,
235 ) -> Runtime
236{
239 let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
240 service_runtime.get_handle().spawn(async move {
241 service
242 .start_service(
243 #[cfg(unix)]
244 fds,
245 shutdown,
246 listeners_per_fd,
247 )
248 .await;
249 info!("service exited.")
250 });
251 service_runtime
252 }
253
254 #[cfg(unix)]
255 fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
256 let mut fds = Fds::new();
257 if upgrade {
258 debug!("Trying to receive socks");
259 fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
260 }
261 self.listen_fds = Some(Arc::new(Mutex::new(fds)));
262 Ok(())
263 }
264
265 pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
273 let opt = raw_opt.into();
274 if let Some(opts) = &opt {
275 if let Some(c) = opts.conf.as_ref() {
276 warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
277 }
278 conf.merge_with_opt(opts);
279 }
280
281 let (tx, rx) = watch::channel(false);
282
283 Server {
284 services: vec![],
285 #[cfg(unix)]
286 listen_fds: None,
287 shutdown_watch: tx,
288 shutdown_recv: rx,
289 configuration: Arc::new(conf),
290 options: opt,
291 #[cfg(feature = "sentry")]
292 sentry: None,
293 }
294 }
295
296 pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
304 let opt = opt.into();
305 let (tx, rx) = watch::channel(false);
306
307 let conf = if let Some(opt) = opt.as_ref() {
308 opt.conf.as_ref().map_or_else(
309 || {
310 ServerConf::new_with_opt_override(opt).ok_or_else(|| {
312 Error::explain(ErrorType::ReadError, "Conf generation failed")
313 })
314 },
315 |_| {
316 ServerConf::load_yaml_with_opt_override(opt)
318 },
319 )
320 } else {
321 ServerConf::new()
322 .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
323 }?;
324
325 Ok(Server {
326 services: vec![],
327 #[cfg(unix)]
328 listen_fds: None,
329 shutdown_watch: tx,
330 shutdown_recv: rx,
331 configuration: Arc::new(conf),
332 options: opt,
333 #[cfg(feature = "sentry")]
334 sentry: None,
335 })
336 }
337
338 pub fn add_service(&mut self, service: impl Service + 'static) {
342 self.services.push(Box::new(service));
343 }
344
345 pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
347 self.services.extend(services);
348 }
349
350 pub fn bootstrap(&mut self) {
355 info!("Bootstrap starting");
356 debug!("{:#?}", self.options);
357
358 #[cfg(all(not(debug_assertions), feature = "sentry"))]
360 let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
361
362 if self.options.as_ref().is_some_and(|o| o.test) {
363 info!("Server Test passed, exiting");
364 std::process::exit(0);
365 }
366
367 #[cfg(unix)]
369 match self.load_fds(self.options.as_ref().is_some_and(|o| o.upgrade)) {
370 Ok(_) => {
371 info!("Bootstrap done");
372 }
373 Err(e) => {
374 #[cfg(all(not(debug_assertions), feature = "sentry"))]
376 sentry::capture_error(&e);
377
378 error!("Bootstrap failed on error: {:?}, exiting.", e);
379 std::process::exit(1);
380 }
381 }
382 }
383
384 pub fn run_forever(self) -> ! {
386 info!("Server starting");
387
388 self.run(RunArgs::default());
389
390 info!("All runtimes exited, exiting now");
391 std::process::exit(0)
392 }
393
394 pub fn run(mut self, run_args: RunArgs) {
402 info!("Server starting");
403
404 let conf = self.configuration.as_ref();
405
406 #[cfg(unix)]
407 if conf.daemon {
408 info!("Daemonizing the server");
409 fast_timeout::pause_for_fork();
410 daemonize(&self.configuration);
411 fast_timeout::unpause();
412 }
413
414 #[cfg(windows)]
415 if conf.daemon {
416 panic!("Daemonizing under windows is not supported");
417 }
418
419 #[cfg(all(not(debug_assertions), feature = "sentry"))]
421 let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
422
423 let mut runtimes: Vec<Runtime> = Vec::new();
424
425 while let Some(service) = self.services.pop() {
426 let threads = service.threads().unwrap_or(conf.threads);
427 let runtime = Server::run_service(
428 service,
429 #[cfg(unix)]
430 self.listen_fds.clone(),
431 self.shutdown_recv.clone(),
432 threads,
433 conf.work_stealing,
434 self.configuration.listener_tasks_per_fd,
435 );
436 runtimes.push(runtime);
437 }
438
439 let server_runtime = Server::create_runtime("Server", 1, true);
442 #[cfg(unix)]
443 let shutdown_type = server_runtime
444 .get_handle()
445 .block_on(self.main_loop(run_args));
446 #[cfg(windows)]
447 let shutdown_type = ShutdownType::Graceful;
448
449 if matches!(shutdown_type, ShutdownType::Graceful) {
450 let exit_timeout = self
451 .configuration
452 .as_ref()
453 .grace_period_seconds
454 .unwrap_or(EXIT_TIMEOUT);
455 info!("Graceful shutdown: grace period {}s starts", exit_timeout);
456 thread::sleep(Duration::from_secs(exit_timeout));
457 info!("Graceful shutdown: grace period ends");
458 }
459
460 let shutdown_timeout = match shutdown_type {
462 ShutdownType::Quick => Duration::from_secs(0),
463 ShutdownType::Graceful => Duration::from_secs(
464 self.configuration
465 .as_ref()
466 .graceful_shutdown_timeout_seconds
467 .unwrap_or(5),
468 ),
469 };
470 let shutdowns: Vec<_> = runtimes
471 .into_iter()
472 .map(|rt| {
473 info!("Waiting for runtimes to exit!");
474 thread::spawn(move || {
475 rt.shutdown_timeout(shutdown_timeout);
476 thread::sleep(shutdown_timeout)
477 })
478 })
479 .collect();
480 for shutdown in shutdowns {
481 if let Err(e) = shutdown.join() {
482 error!("Failed to shutdown runtime: {:?}", e);
483 }
484 }
485 }
486
487 fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
488 if work_steal {
489 Runtime::new_steal(threads, name)
490 } else {
491 Runtime::new_no_steal(threads, name)
492 }
493 }
494}