1mod bootstrap_services;
18pub mod configuration;
19#[cfg(unix)]
20mod daemon;
21#[cfg(unix)]
22pub(crate) mod transfer_fd;
23
24use async_trait::async_trait;
25#[cfg(unix)]
26use daemon::daemonize;
27use daggy::NodeIndex;
28use log::{debug, error, info, warn};
29use parking_lot::Mutex;
30use pingora_runtime::Runtime;
31use pingora_timeout::fast_timeout;
32#[cfg(feature = "sentry")]
33use sentry::ClientOptions;
34use std::sync::Arc;
35use std::thread;
36use std::time::SystemTime;
37#[cfg(unix)]
38use tokio::signal::unix;
39use tokio::sync::{broadcast, watch, Mutex as TokioMutex};
40use tokio::time::{sleep, Duration};
41
42use crate::prelude::background_service;
43use crate::server::bootstrap_services::{Bootstrap, BootstrapService, SentryInitService};
44use crate::services::{
45 DependencyGraph, ServiceHandle, ServiceReadyNotifier, ServiceReadyWatch, ServiceWithDependents,
46};
47use configuration::{Opt, ServerConf};
48use std::collections::HashMap;
49#[cfg(unix)]
50pub use transfer_fd::Fds;
51
52use pingora_error::{Error, ErrorType, Result};
53
54const EXIT_TIMEOUT: u64 = 60 * 5;
57const CLOSE_TIMEOUT: u64 = 5;
60
61enum ShutdownType {
62 Graceful,
63 Quick,
64}
65
66pub(crate) struct ServiceWrapper {
68 ready_notifier: Option<ServiceReadyNotifier>,
69 service: Box<dyn ServiceWithDependents>,
70 service_handle: ServiceHandle,
71}
72
73#[derive(Clone, Debug)]
75#[non_exhaustive]
76pub enum ExecutionPhase {
77 Setup,
79
80 Bootstrap,
84
85 BootstrapComplete,
87
88 Running,
90
91 GracefulUpgradeTransferringFds,
95
96 GracefulUpgradeCloseTimeout,
99
100 GracefulTerminate,
102
103 ShutdownStarted,
105
106 ShutdownGracePeriod,
108
109 ShutdownRuntimes,
111
112 Terminated,
114}
115
116pub type ShutdownWatch = watch::Receiver<bool>;
119#[cfg(unix)]
120pub type ListenFds = Arc<TokioMutex<Fds>>;
121
122#[derive(Debug)]
124pub enum ShutdownSignal {
125 GracefulUpgrade,
128 GracefulTerminate,
131 FastShutdown,
133}
134
135#[async_trait]
138pub trait ShutdownSignalWatch {
139 async fn recv(&self) -> ShutdownSignal;
141}
142
143#[cfg(unix)]
149pub struct UnixShutdownSignalWatch;
150
151#[cfg(unix)]
152#[async_trait]
153impl ShutdownSignalWatch for UnixShutdownSignalWatch {
154 async fn recv(&self) -> ShutdownSignal {
155 let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
156 let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
157 let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
158
159 tokio::select! {
160 _ = graceful_upgrade_signal.recv() => {
161 ShutdownSignal::GracefulUpgrade
162 },
163 _ = graceful_terminate_signal.recv() => {
164 ShutdownSignal::GracefulTerminate
165 },
166 _ = fast_shutdown_signal.recv() => {
167 ShutdownSignal::FastShutdown
168 },
169 }
170 }
171}
172
173pub struct RunArgs {
175 #[cfg(unix)]
177 pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
178}
179
180impl Default for RunArgs {
181 #[cfg(unix)]
182 fn default() -> Self {
183 Self {
184 shutdown_signal: Box::new(UnixShutdownSignalWatch),
185 }
186 }
187
188 #[cfg(windows)]
189 fn default() -> Self {
190 Self {}
191 }
192}
193
194pub struct Server {
200 init_services: Vec<Box<dyn ServiceWithDependents + 'static>>,
203
204 services: HashMap<NodeIndex, ServiceWrapper>,
205 shutdown_watch: watch::Sender<bool>,
206 shutdown_recv: ShutdownWatch,
208
209 execution_phase_watch: broadcast::Sender<ExecutionPhase>,
213
214 dependencies: Arc<Mutex<DependencyGraph>>,
216
217 bootstrap: Arc<Mutex<Bootstrap>>,
219
220 pub configuration: Arc<ServerConf>,
222 pub options: Option<Opt>,
224}
225
226impl Server {
229 pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
233 self.execution_phase_watch.subscribe()
234 }
235
236 #[cfg(unix)]
237 async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
238 self.execution_phase_watch
241 .send(ExecutionPhase::Running)
242 .ok();
243
244 match run_args.shutdown_signal.recv().await {
245 ShutdownSignal::FastShutdown => {
246 info!("SIGINT received, exiting");
247 ShutdownType::Quick
248 }
249 ShutdownSignal::GracefulTerminate => {
250 info!("SIGTERM received, gracefully exiting");
252 info!("Broadcasting graceful shutdown");
254 match self.shutdown_watch.send(true) {
255 Ok(_) => {
256 info!("Graceful shutdown started!");
257 }
258 Err(e) => {
259 error!("Graceful shutdown broadcast failed: {e}");
260 }
261 }
262 info!("Broadcast graceful shutdown complete");
263
264 self.execution_phase_watch
265 .send(ExecutionPhase::GracefulTerminate)
266 .ok();
267
268 ShutdownType::Graceful
269 }
270 ShutdownSignal::GracefulUpgrade => {
271 info!("SIGQUIT received, sending socks and gracefully exiting");
274
275 self.execution_phase_watch
276 .send(ExecutionPhase::GracefulUpgradeTransferringFds)
277 .ok();
278
279 if let Some(fds) = self.listen_fds() {
280 let fds = fds.lock().await;
281 info!("Trying to send socks");
282 match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
284 Ok(_) => {
285 info!("listener sockets sent");
286 }
287 Err(e) => {
288 error!("Unable to send listener sockets to new process: {e}");
289 #[cfg(all(not(debug_assertions), feature = "sentry"))]
291 sentry::capture_error(&e);
292 }
293 }
294 self.execution_phase_watch
295 .send(ExecutionPhase::GracefulUpgradeCloseTimeout)
296 .ok();
297 sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
298 info!("Broadcasting graceful shutdown");
299 match self.shutdown_watch.send(true) {
301 Ok(_) => {
302 info!("Graceful shutdown started!");
303 }
304 Err(e) => {
305 error!("Graceful shutdown broadcast failed: {e}");
306 return ShutdownType::Graceful;
308 }
309 }
310 info!("Broadcast graceful shutdown complete");
311 ShutdownType::Graceful
312 } else {
313 info!("No socks to send, shutting down.");
314 ShutdownType::Graceful
315 }
316 }
317 }
318 }
319
320 #[cfg(windows)]
321 async fn main_loop(&self, _run_args: RunArgs) -> ShutdownType {
322 self.execution_phase_watch
325 .send(ExecutionPhase::Running)
326 .ok();
327
328 match tokio::signal::ctrl_c().await {
329 Ok(()) => {
330 info!("Ctrl+C received, gracefully exiting");
331 info!("Broadcasting graceful shutdown");
333 match self.shutdown_watch.send(true) {
334 Ok(_) => {
335 info!("Graceful shutdown started!");
336 }
337 Err(e) => {
338 error!("Graceful shutdown broadcast failed: {e}");
339 }
340 }
341 info!("Broadcast graceful shutdown complete");
342
343 self.execution_phase_watch
344 .send(ExecutionPhase::GracefulTerminate)
345 .ok();
346
347 ShutdownType::Graceful
348 }
349 Err(e) => {
350 error!("Unable to listen for shutdown signal: {}", e);
351 ShutdownType::Quick
352 }
353 }
354 }
355
356 #[cfg(feature = "sentry")]
357 #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
358 pub fn set_sentry_config(&mut self, sentry_config: ClientOptions) {
362 self.bootstrap.lock().set_sentry_config(Some(sentry_config));
363 }
364
365 #[cfg(unix)]
367 fn listen_fds(&self) -> Option<ListenFds> {
368 self.bootstrap.lock().get_fds()
369 }
370
371 #[allow(clippy::too_many_arguments)]
372 fn run_service(
373 mut service: Box<dyn ServiceWithDependents>,
374 #[cfg(unix)] fds: Option<ListenFds>,
375 shutdown: ShutdownWatch,
376 threads: usize,
377 work_stealing: bool,
378 listeners_per_fd: usize,
379 ready_notifier: ServiceReadyNotifier,
380 dependency_watches: Vec<ServiceReadyWatch>,
381 ) -> Runtime
382{
385 let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
386 let service_name = service.name().to_string();
387 service_runtime.get_handle().spawn(async move {
388 let mut time_waited_opt: Option<Duration> = None;
390 for mut watch in dependency_watches {
391 let start = SystemTime::now();
392
393 if watch.wait_for(|&ready| ready).await.is_err() {
394 error!(
395 "Service '{}' dependency channel closed before ready",
396 service_name
397 );
398 }
399
400 *time_waited_opt.get_or_insert_default() += start.elapsed().unwrap_or_default()
401 }
402
403 if let Some(time_waited) = time_waited_opt {
404 service.on_startup_delay(time_waited);
405 }
406
407 service
409 .start_service(
410 #[cfg(unix)]
411 fds,
412 shutdown,
413 listeners_per_fd,
414 ready_notifier,
415 )
416 .await;
417 info!("service '{}' exited.", service_name);
418 });
419 service_runtime
420 }
421
422 pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
430 let opt = raw_opt.into();
431 if let Some(opts) = &opt {
432 if let Some(c) = opts.conf.as_ref() {
433 warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
434 }
435 conf.merge_with_opt(opts);
436 }
437
438 let (tx, rx) = watch::channel(false);
439
440 let execution_phase_watch = broadcast::channel(100).0;
441 let bootstrap = Arc::new(Mutex::new(Bootstrap::new(
442 &opt,
443 &conf,
444 &execution_phase_watch,
445 )));
446
447 Server {
448 services: Default::default(),
449 init_services: Default::default(),
450 shutdown_watch: tx,
451 shutdown_recv: rx,
452 execution_phase_watch,
453 configuration: Arc::new(conf),
454 options: opt,
455 dependencies: Arc::new(Mutex::new(DependencyGraph::new())),
456 bootstrap,
457 }
458 }
459
460 pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
468 let opt = opt.into();
469 let (tx, rx) = watch::channel(false);
470
471 let execution_phase_watch = broadcast::channel(100).0;
472 let conf = if let Some(opt) = opt.as_ref() {
473 opt.conf.as_ref().map_or_else(
474 || {
475 ServerConf::new_with_opt_override(opt).ok_or_else(|| {
477 Error::explain(ErrorType::ReadError, "Conf generation failed")
478 })
479 },
480 |_| {
481 ServerConf::load_yaml_with_opt_override(opt)
483 },
484 )
485 } else {
486 ServerConf::new()
487 .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
488 }?;
489
490 let bootstrap = Arc::new(Mutex::new(Bootstrap::new(
491 &opt,
492 &conf,
493 &execution_phase_watch,
494 )));
495
496 Ok(Server {
497 services: Default::default(),
498 init_services: Default::default(),
499 shutdown_watch: tx,
500 shutdown_recv: rx,
501 execution_phase_watch,
502 configuration: Arc::new(conf),
503 options: opt,
504 dependencies: Arc::new(Mutex::new(DependencyGraph::new())),
505 bootstrap,
506 })
507 }
508
509 fn add_init_service(&mut self, service: impl ServiceWithDependents + 'static) {
511 let boxed_service = Box::new(service);
512 self.init_services.push(boxed_service);
513 }
514
515 fn apply_init_service_dependencies(&mut self) {
517 let services = self
518 .services
519 .values()
520 .map(|service| service.service_handle.clone())
521 .collect::<Vec<_>>();
522 let global_deps = self
523 .init_services
524 .drain(..)
525 .collect::<Vec<_>>()
526 .into_iter()
527 .map(|dep| self.add_boxed_service(dep))
528 .collect::<Vec<_>>();
529 for service in services {
530 service.add_dependencies(&global_deps);
531 }
532 }
533
534 pub fn add_service(&mut self, service: impl ServiceWithDependents + 'static) -> ServiceHandle {
548 self.add_boxed_service(Box::new(service))
549 }
550
551 pub fn add_boxed_service(
565 &mut self,
566 service_box: Box<dyn ServiceWithDependents>,
567 ) -> ServiceHandle {
568 let name = service_box.name().to_string();
569
570 let (tx, rx) = watch::channel(false);
572
573 let id = self.dependencies.lock().add_node(name.clone(), rx.clone());
574
575 let service_handle = ServiceHandle::new(id, name, rx, &self.dependencies);
576
577 let wrapper = ServiceWrapper {
578 ready_notifier: Some(ServiceReadyNotifier::new(tx)),
579 service: service_box,
580 service_handle: service_handle.clone(),
581 };
582
583 self.services.insert(id, wrapper);
584
585 service_handle
586 }
587
588 pub fn add_services(
592 &mut self,
593 services: Vec<Box<dyn ServiceWithDependents>>,
594 ) -> Vec<ServiceHandle> {
595 services
596 .into_iter()
597 .map(|service| self.add_boxed_service(service))
598 .collect()
599 }
600
601 pub fn bootstrap(&mut self) {
606 self.bootstrap.lock().bootstrap();
607 }
608
609 pub fn bootstrap_as_a_service(&mut self) -> ServiceHandle {
617 let bootstrap_service =
618 background_service("Bootstrap Service", BootstrapService::new(&self.bootstrap));
619
620 let sentry_service = background_service(
621 "Sentry Init Service",
622 SentryInitService::new(&self.bootstrap),
623 );
624
625 self.add_init_service(sentry_service);
626
627 self.add_service(bootstrap_service)
628 }
629
630 pub fn run_forever(self) -> ! {
638 self.run(RunArgs::default());
639
640 std::process::exit(0)
641 }
642
643 pub fn run(mut self, run_args: RunArgs) {
654 self.apply_init_service_dependencies();
655
656 info!("Server starting");
657
658 let conf = self.configuration.as_ref();
659
660 #[cfg(unix)]
661 if conf.daemon {
662 info!("Daemonizing the server");
663 fast_timeout::pause_for_fork();
664 daemonize(&self.configuration);
665 fast_timeout::unpause();
666 }
667
668 #[cfg(windows)]
669 if conf.daemon {
670 panic!("Daemonizing under windows is not supported");
671 }
672
673 let mut runtimes: Vec<(Runtime, String)> = Vec::new();
675
676 let startup_order = match self.dependencies.lock().topological_sort() {
678 Ok(order) => order,
679 Err(e) => {
680 error!("Failed to determine service startup order: {}", e);
681 std::process::exit(1);
682 }
683 };
684
685 let service_names: Vec<String> = startup_order
687 .iter()
688 .map(|(_, service)| service.name.clone())
689 .collect();
690 info!("Starting services in dependency order: {:?}", service_names);
691
692 for (service_id, service) in startup_order {
694 let mut wrapper = match self.services.remove(&service_id) {
695 Some(w) => w,
696 None => {
697 warn!(
698 "Service ID {:?}-{} in startup order but not found",
699 service_id, service.name
700 );
701 continue;
702 }
703 };
704
705 let threads = wrapper.service.threads().unwrap_or(conf.threads);
706 let name = wrapper.service.name().to_string();
707
708 let dependencies = self
710 .dependencies
711 .lock()
712 .get_dependencies(wrapper.service_handle.id);
713
714 let ready_notifier = wrapper
718 .ready_notifier
719 .take()
720 .expect("Service notifier should exist");
721
722 if !dependencies.is_empty() {
723 info!(
724 "Service '{name}' will wait for dependencies: {:?}",
725 dependencies.iter().map(|s| &s.name).collect::<Vec<_>>()
726 );
727 } else {
728 info!("Starting service: {}", name);
729 }
730
731 let dependency_watches = dependencies
732 .iter()
733 .map(|s| s.ready_watch.clone())
734 .collect::<Vec<_>>();
735
736 let runtime = Server::run_service(
737 wrapper.service,
738 #[cfg(unix)]
739 self.listen_fds(),
740 self.shutdown_recv.clone(),
741 threads,
742 conf.work_stealing,
743 self.configuration.listener_tasks_per_fd,
744 ready_notifier,
745 dependency_watches,
746 );
747 runtimes.push((runtime, name));
748 }
749
750 let server_runtime = Server::create_runtime("Server", 1, true);
753 #[cfg(unix)]
754 let shutdown_type = server_runtime
755 .get_handle()
756 .block_on(self.main_loop(run_args));
757 #[cfg(windows)]
758 let shutdown_type = server_runtime
759 .get_handle()
760 .block_on(self.main_loop(run_args));
761
762 self.execution_phase_watch
763 .send(ExecutionPhase::ShutdownStarted)
764 .ok();
765
766 if matches!(shutdown_type, ShutdownType::Graceful) {
767 self.execution_phase_watch
768 .send(ExecutionPhase::ShutdownGracePeriod)
769 .ok();
770
771 let exit_timeout = self
772 .configuration
773 .as_ref()
774 .grace_period_seconds
775 .unwrap_or(EXIT_TIMEOUT);
776 info!("Graceful shutdown: grace period {}s starts", exit_timeout);
777 thread::sleep(Duration::from_secs(exit_timeout));
778 info!("Graceful shutdown: grace period ends");
779 }
780
781 let shutdown_timeout = match shutdown_type {
783 ShutdownType::Quick => Duration::from_secs(0),
784 ShutdownType::Graceful => Duration::from_secs(
785 self.configuration
786 .as_ref()
787 .graceful_shutdown_timeout_seconds
788 .unwrap_or(5),
789 ),
790 };
791
792 self.execution_phase_watch
793 .send(ExecutionPhase::ShutdownRuntimes)
794 .ok();
795
796 let shutdowns: Vec<_> = runtimes
797 .into_iter()
798 .map(|(rt, name)| {
799 info!("Waiting for runtimes to exit!");
800 let join = thread::spawn(move || {
801 rt.shutdown_timeout(shutdown_timeout);
802 thread::sleep(shutdown_timeout)
803 });
804 (join, name)
805 })
806 .collect();
807 for (shutdown, name) in shutdowns {
808 info!("Waiting for service runtime {} to exit", name);
809 if let Err(e) = shutdown.join() {
810 error!("Failed to shutdown service runtime {}: {:?}", name, e);
811 }
812 debug!("Service runtime {} has exited", name);
813 }
814 info!("All runtimes exited, exiting now");
815
816 self.execution_phase_watch
817 .send(ExecutionPhase::Terminated)
818 .ok();
819 }
820
821 fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
822 if work_steal {
823 Runtime::new_steal(threads, name)
824 } else {
825 Runtime::new_no_steal(threads, name)
826 }
827 }
828}