1pub mod configuration;
18#[cfg(unix)]
19mod daemon;
20#[cfg(unix)]
21pub(crate) mod transfer_fd;
22
23use async_trait::async_trait;
24#[cfg(unix)]
25use daemon::daemonize;
26use log::{debug, error, info, warn};
27use pingora_runtime::Runtime;
28use pingora_timeout::fast_timeout;
29#[cfg(feature = "sentry")]
30use sentry::ClientOptions;
31use std::sync::Arc;
32use std::thread;
33#[cfg(unix)]
34use tokio::signal::unix;
35use tokio::sync::{broadcast, watch, Mutex};
36use tokio::time::{sleep, Duration};
37
38use crate::services::Service;
39use configuration::{Opt, ServerConf};
40#[cfg(unix)]
41pub use transfer_fd::Fds;
42
43use pingora_error::{Error, ErrorType, Result};
44
45const EXIT_TIMEOUT: u64 = 60 * 5;
48const CLOSE_TIMEOUT: u64 = 5;
51
52enum ShutdownType {
53 Graceful,
54 Quick,
55}
56
57#[derive(Clone, Debug)]
59#[non_exhaustive]
60pub enum ExecutionPhase {
61 Setup,
63
64 Bootstrap,
68
69 BootstrapComplete,
71
72 Running,
74
75 GracefulUpgradeTransferringFds,
79
80 GracefulUpgradeCloseTimeout,
83
84 GracefulTerminate,
86
87 ShutdownStarted,
89
90 ShutdownGracePeriod,
92
93 ShutdownRuntimes,
95
96 Terminated,
98}
99
100pub type ShutdownWatch = watch::Receiver<bool>;
103#[cfg(unix)]
104pub type ListenFds = Arc<Mutex<Fds>>;
105
106#[derive(Debug)]
108pub enum ShutdownSignal {
109 GracefulUpgrade,
112 GracefulTerminate,
115 FastShutdown,
117}
118
119#[async_trait]
122pub trait ShutdownSignalWatch {
123 async fn recv(&self) -> ShutdownSignal;
125}
126
127#[cfg(unix)]
133pub struct UnixShutdownSignalWatch;
134
135#[cfg(unix)]
136#[async_trait]
137impl ShutdownSignalWatch for UnixShutdownSignalWatch {
138 async fn recv(&self) -> ShutdownSignal {
139 let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
140 let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
141 let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
142
143 tokio::select! {
144 _ = graceful_upgrade_signal.recv() => {
145 ShutdownSignal::GracefulUpgrade
146 },
147 _ = graceful_terminate_signal.recv() => {
148 ShutdownSignal::GracefulTerminate
149 },
150 _ = fast_shutdown_signal.recv() => {
151 ShutdownSignal::FastShutdown
152 },
153 }
154 }
155}
156
157pub struct RunArgs {
159 #[cfg(unix)]
161 pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
162}
163
164impl Default for RunArgs {
165 #[cfg(unix)]
166 fn default() -> Self {
167 Self {
168 shutdown_signal: Box::new(UnixShutdownSignalWatch),
169 }
170 }
171
172 #[cfg(windows)]
173 fn default() -> Self {
174 Self {}
175 }
176}
177
178pub struct Server {
184 services: Vec<Box<dyn Service>>,
185 #[cfg(unix)]
186 listen_fds: Option<ListenFds>,
187 shutdown_watch: watch::Sender<bool>,
188 shutdown_recv: ShutdownWatch,
190
191 execution_phase_watch: broadcast::Sender<ExecutionPhase>,
195
196 pub configuration: Arc<ServerConf>,
198 pub options: Option<Opt>,
200 #[cfg(feature = "sentry")]
201 #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
202 pub sentry: Option<ClientOptions>,
206}
207
208impl Server {
211 pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
215 self.execution_phase_watch.subscribe()
216 }
217
218 #[cfg(unix)]
219 async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
220 self.execution_phase_watch
223 .send(ExecutionPhase::Running)
224 .ok();
225
226 match run_args.shutdown_signal.recv().await {
227 ShutdownSignal::FastShutdown => {
228 info!("SIGINT received, exiting");
229 ShutdownType::Quick
230 }
231 ShutdownSignal::GracefulTerminate => {
232 info!("SIGTERM received, gracefully exiting");
234 info!("Broadcasting graceful shutdown");
236 match self.shutdown_watch.send(true) {
237 Ok(_) => {
238 info!("Graceful shutdown started!");
239 }
240 Err(e) => {
241 error!("Graceful shutdown broadcast failed: {e}");
242 }
243 }
244 info!("Broadcast graceful shutdown complete");
245
246 self.execution_phase_watch
247 .send(ExecutionPhase::GracefulTerminate)
248 .ok();
249
250 ShutdownType::Graceful
251 }
252 ShutdownSignal::GracefulUpgrade => {
253 info!("SIGQUIT received, sending socks and gracefully exiting");
256
257 self.execution_phase_watch
258 .send(ExecutionPhase::GracefulUpgradeTransferringFds)
259 .ok();
260
261 if let Some(fds) = &self.listen_fds {
262 let fds = fds.lock().await;
263 info!("Trying to send socks");
264 match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
266 Ok(_) => {
267 info!("listener sockets sent");
268 }
269 Err(e) => {
270 error!("Unable to send listener sockets to new process: {e}");
271 #[cfg(all(not(debug_assertions), feature = "sentry"))]
273 sentry::capture_error(&e);
274 }
275 }
276 self.execution_phase_watch
277 .send(ExecutionPhase::GracefulUpgradeCloseTimeout)
278 .ok();
279 sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
280 info!("Broadcasting graceful shutdown");
281 match self.shutdown_watch.send(true) {
283 Ok(_) => {
284 info!("Graceful shutdown started!");
285 }
286 Err(e) => {
287 error!("Graceful shutdown broadcast failed: {e}");
288 return ShutdownType::Graceful;
290 }
291 }
292 info!("Broadcast graceful shutdown complete");
293 ShutdownType::Graceful
294 } else {
295 info!("No socks to send, shutting down.");
296 ShutdownType::Graceful
297 }
298 }
299 }
300 }
301
302 fn run_service(
303 mut service: Box<dyn Service>,
304 #[cfg(unix)] fds: Option<ListenFds>,
305 shutdown: ShutdownWatch,
306 threads: usize,
307 work_stealing: bool,
308 listeners_per_fd: usize,
309 ) -> Runtime
310{
313 let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
314 service_runtime.get_handle().spawn(async move {
315 service
316 .start_service(
317 #[cfg(unix)]
318 fds,
319 shutdown,
320 listeners_per_fd,
321 )
322 .await;
323 info!("service exited.")
324 });
325 service_runtime
326 }
327
328 #[cfg(unix)]
329 fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
330 let mut fds = Fds::new();
331 if upgrade {
332 debug!("Trying to receive socks");
333 fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
334 }
335 self.listen_fds = Some(Arc::new(Mutex::new(fds)));
336 Ok(())
337 }
338
339 pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
347 let opt = raw_opt.into();
348 if let Some(opts) = &opt {
349 if let Some(c) = opts.conf.as_ref() {
350 warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
351 }
352 conf.merge_with_opt(opts);
353 }
354
355 let (tx, rx) = watch::channel(false);
356
357 Server {
358 services: vec![],
359 #[cfg(unix)]
360 listen_fds: None,
361 shutdown_watch: tx,
362 shutdown_recv: rx,
363 execution_phase_watch: broadcast::channel(100).0,
364 configuration: Arc::new(conf),
365 options: opt,
366 #[cfg(feature = "sentry")]
367 sentry: None,
368 }
369 }
370
371 pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
379 let opt = opt.into();
380 let (tx, rx) = watch::channel(false);
381
382 let conf = if let Some(opt) = opt.as_ref() {
383 opt.conf.as_ref().map_or_else(
384 || {
385 ServerConf::new_with_opt_override(opt).ok_or_else(|| {
387 Error::explain(ErrorType::ReadError, "Conf generation failed")
388 })
389 },
390 |_| {
391 ServerConf::load_yaml_with_opt_override(opt)
393 },
394 )
395 } else {
396 ServerConf::new()
397 .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
398 }?;
399
400 Ok(Server {
401 services: vec![],
402 #[cfg(unix)]
403 listen_fds: None,
404 shutdown_watch: tx,
405 shutdown_recv: rx,
406 execution_phase_watch: broadcast::channel(100).0,
407 configuration: Arc::new(conf),
408 options: opt,
409 #[cfg(feature = "sentry")]
410 sentry: None,
411 })
412 }
413
414 pub fn add_service(&mut self, service: impl Service + 'static) {
418 self.services.push(Box::new(service));
419 }
420
421 pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
423 self.services.extend(services);
424 }
425
426 pub fn bootstrap(&mut self) {
431 info!("Bootstrap starting");
432 debug!("{:#?}", self.options);
433
434 self.execution_phase_watch
435 .send(ExecutionPhase::Bootstrap)
436 .ok();
437
438 #[cfg(all(not(debug_assertions), feature = "sentry"))]
440 let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
441
442 if self.options.as_ref().is_some_and(|o| o.test) {
443 info!("Server Test passed, exiting");
444 std::process::exit(0);
445 }
446
447 #[cfg(unix)]
449 match self.load_fds(self.options.as_ref().is_some_and(|o| o.upgrade)) {
450 Ok(_) => {
451 info!("Bootstrap done");
452 }
453 Err(e) => {
454 #[cfg(all(not(debug_assertions), feature = "sentry"))]
456 sentry::capture_error(&e);
457
458 error!("Bootstrap failed on error: {:?}, exiting.", e);
459 std::process::exit(1);
460 }
461 }
462
463 self.execution_phase_watch
464 .send(ExecutionPhase::BootstrapComplete)
465 .ok();
466 }
467
468 pub fn run_forever(self) -> ! {
476 info!("Server starting");
477
478 self.run(RunArgs::default());
479
480 info!("All runtimes exited, exiting now");
481 std::process::exit(0)
482 }
483
484 pub fn run(mut self, run_args: RunArgs) {
495 info!("Server starting");
496
497 let conf = self.configuration.as_ref();
498
499 #[cfg(unix)]
500 if conf.daemon {
501 info!("Daemonizing the server");
502 fast_timeout::pause_for_fork();
503 daemonize(&self.configuration);
504 fast_timeout::unpause();
505 }
506
507 #[cfg(windows)]
508 if conf.daemon {
509 panic!("Daemonizing under windows is not supported");
510 }
511
512 #[cfg(all(not(debug_assertions), feature = "sentry"))]
514 let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
515
516 let mut runtimes: Vec<(Runtime, String)> = Vec::new();
518
519 while let Some(service) = self.services.pop() {
520 let threads = service.threads().unwrap_or(conf.threads);
521 let name = service.name().to_string();
522 let runtime = Server::run_service(
523 service,
524 #[cfg(unix)]
525 self.listen_fds.clone(),
526 self.shutdown_recv.clone(),
527 threads,
528 conf.work_stealing,
529 self.configuration.listener_tasks_per_fd,
530 );
531 runtimes.push((runtime, name));
532 }
533
534 let server_runtime = Server::create_runtime("Server", 1, true);
537 #[cfg(unix)]
538 let shutdown_type = server_runtime
539 .get_handle()
540 .block_on(self.main_loop(run_args));
541 #[cfg(windows)]
542 let shutdown_type = ShutdownType::Graceful;
543
544 self.execution_phase_watch
545 .send(ExecutionPhase::ShutdownStarted)
546 .ok();
547
548 if matches!(shutdown_type, ShutdownType::Graceful) {
549 self.execution_phase_watch
550 .send(ExecutionPhase::ShutdownGracePeriod)
551 .ok();
552
553 let exit_timeout = self
554 .configuration
555 .as_ref()
556 .grace_period_seconds
557 .unwrap_or(EXIT_TIMEOUT);
558 info!("Graceful shutdown: grace period {}s starts", exit_timeout);
559 thread::sleep(Duration::from_secs(exit_timeout));
560 info!("Graceful shutdown: grace period ends");
561 }
562
563 let shutdown_timeout = match shutdown_type {
565 ShutdownType::Quick => Duration::from_secs(0),
566 ShutdownType::Graceful => Duration::from_secs(
567 self.configuration
568 .as_ref()
569 .graceful_shutdown_timeout_seconds
570 .unwrap_or(5),
571 ),
572 };
573
574 self.execution_phase_watch
575 .send(ExecutionPhase::ShutdownRuntimes)
576 .ok();
577
578 let shutdowns: Vec<_> = runtimes
579 .into_iter()
580 .map(|(rt, name)| {
581 info!("Waiting for runtimes to exit!");
582 let join = thread::spawn(move || {
583 rt.shutdown_timeout(shutdown_timeout);
584 thread::sleep(shutdown_timeout)
585 });
586 (join, name)
587 })
588 .collect();
589 for (shutdown, name) in shutdowns {
590 info!("Waiting for service runtime {} to exit", name);
591 if let Err(e) = shutdown.join() {
592 error!("Failed to shutdown service runtime {}: {:?}", name, e);
593 }
594 debug!("Service runtime {} has exited", name);
595 }
596 info!("All runtimes exited, exiting now");
597
598 self.execution_phase_watch
599 .send(ExecutionPhase::Terminated)
600 .ok();
601 }
602
603 fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
604 if work_steal {
605 Runtime::new_steal(threads, name)
606 } else {
607 Runtime::new_no_steal(threads, name)
608 }
609 }
610}