pingora_core/server/
mod.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Server process and configuration management
16
17pub mod configuration;
18#[cfg(unix)]
19mod daemon;
20#[cfg(unix)]
21pub(crate) mod transfer_fd;
22
23use async_trait::async_trait;
24#[cfg(unix)]
25use daemon::daemonize;
26use log::{debug, error, info, warn};
27use pingora_runtime::Runtime;
28use pingora_timeout::fast_timeout;
29#[cfg(feature = "sentry")]
30use sentry::ClientOptions;
31use std::sync::Arc;
32use std::thread;
33#[cfg(unix)]
34use tokio::signal::unix;
35use tokio::sync::{watch, Mutex};
36use tokio::time::{sleep, Duration};
37
38use crate::services::Service;
39use configuration::{Opt, ServerConf};
40#[cfg(unix)]
41pub use transfer_fd::Fds;
42
43use pingora_error::{Error, ErrorType, Result};
44
45/* Time to wait before exiting the program.
46This is the graceful period for all existing sessions to finish */
47const EXIT_TIMEOUT: u64 = 60 * 5;
48/* Time to wait before shutting down listening sockets.
49This is the graceful period for the new service to get ready */
50const CLOSE_TIMEOUT: u64 = 5;
51
52enum ShutdownType {
53    Graceful,
54    Quick,
55}
56
57/// The receiver for server's shutdown event. The value will turn to true once the server starts
58/// to shutdown
59pub type ShutdownWatch = watch::Receiver<bool>;
60#[cfg(unix)]
61pub type ListenFds = Arc<Mutex<Fds>>;
62
63/// The type of shutdown process that has been requested.
64#[derive(Debug)]
65pub enum ShutdownSignal {
66    /// Send file descriptors to the new process before starting runtime shutdown with
67    /// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
68    GracefulUpgrade,
69    /// Wait for [ServerConf::grace_period_seconds] before starting runtime shutdown with
70    /// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
71    GracefulTerminate,
72    /// Shutdown with no timeout for runtime shutdown.
73    FastShutdown,
74}
75
76/// Watcher of a shutdown signal, e.g., [UnixShutdownSignalWatch] for Unix-like
77/// platforms.
78#[async_trait]
79pub trait ShutdownSignalWatch {
80    /// Returns the desired shutdown type once one has been requested.
81    async fn recv(&self) -> ShutdownSignal;
82}
83
84/// A Unix shutdown watcher that awaits for Unix signals.
85///
86/// - `SIGQUIT`: graceful upgrade
87/// - `SIGTERM`: graceful terminate
88/// - `SIGINT`: fast shutdown
89#[cfg(unix)]
90pub struct UnixShutdownSignalWatch;
91
92#[cfg(unix)]
93#[async_trait]
94impl ShutdownSignalWatch for UnixShutdownSignalWatch {
95    async fn recv(&self) -> ShutdownSignal {
96        let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
97        let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
98        let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
99
100        tokio::select! {
101            _ = graceful_upgrade_signal.recv() => {
102                ShutdownSignal::GracefulUpgrade
103            },
104            _ = graceful_terminate_signal.recv() => {
105                ShutdownSignal::GracefulTerminate
106            },
107            _ = fast_shutdown_signal.recv() => {
108                ShutdownSignal::FastShutdown
109            },
110        }
111    }
112}
113
114/// Arguments to configure running of the pingora server.
115pub struct RunArgs {
116    /// Signal for initating shutdown
117    #[cfg(unix)]
118    pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
119}
120
121impl Default for RunArgs {
122    #[cfg(unix)]
123    fn default() -> Self {
124        Self {
125            shutdown_signal: Box::new(UnixShutdownSignalWatch),
126        }
127    }
128
129    #[cfg(windows)]
130    fn default() -> Self {
131        Self {}
132    }
133}
134
135/// The server object
136///
137/// This object represents an entire pingora server process which may have multiple independent
138/// services (see [crate::services]). The server object handles signals, reading configuration,
139/// zero downtime upgrade and error reporting.
140pub struct Server {
141    services: Vec<Box<dyn Service>>,
142    #[cfg(unix)]
143    listen_fds: Option<ListenFds>,
144    shutdown_watch: watch::Sender<bool>,
145    // TODO: we many want to drop this copy to let sender call closed()
146    shutdown_recv: ShutdownWatch,
147    /// The parsed server configuration
148    pub configuration: Arc<ServerConf>,
149    /// The parser command line options
150    pub options: Option<Opt>,
151    #[cfg(feature = "sentry")]
152    #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
153    /// The Sentry ClientOptions.
154    ///
155    /// Panics and other events sentry captures will be sent to this DSN **only in release mode**
156    pub sentry: Option<ClientOptions>,
157}
158
159// TODO: delete the pid when exit
160
161impl Server {
162    #[cfg(unix)]
163    async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
164        // waiting for exit signal
165        match run_args.shutdown_signal.recv().await {
166            ShutdownSignal::FastShutdown => {
167                info!("SIGINT received, exiting");
168                ShutdownType::Quick
169            }
170            ShutdownSignal::GracefulTerminate => {
171                // we receive a graceful terminate, all instances are instructed to stop
172                info!("SIGTERM received, gracefully exiting");
173                // graceful shutdown if there are listening sockets
174                info!("Broadcasting graceful shutdown");
175                match self.shutdown_watch.send(true) {
176                    Ok(_) => {
177                        info!("Graceful shutdown started!");
178                    }
179                    Err(e) => {
180                        error!("Graceful shutdown broadcast failed: {e}");
181                    }
182                }
183                info!("Broadcast graceful shutdown complete");
184                ShutdownType::Graceful
185            }
186            ShutdownSignal::GracefulUpgrade => {
187                // TODO: still need to select! on signals in case a fast shutdown is needed
188                // aka: move below to another task and only kick it off here
189                info!("SIGQUIT received, sending socks and gracefully exiting");
190                if let Some(fds) = &self.listen_fds {
191                    let fds = fds.lock().await;
192                    info!("Trying to send socks");
193                    // XXX: this is blocking IO
194                    match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
195                        Ok(_) => {
196                            info!("listener sockets sent");
197                        }
198                        Err(e) => {
199                            error!("Unable to send listener sockets to new process: {e}");
200                            // sentry log error on fd send failure
201                            #[cfg(all(not(debug_assertions), feature = "sentry"))]
202                            sentry::capture_error(&e);
203                        }
204                    }
205                    sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
206                    info!("Broadcasting graceful shutdown");
207                    // gracefully exiting
208                    match self.shutdown_watch.send(true) {
209                        Ok(_) => {
210                            info!("Graceful shutdown started!");
211                        }
212                        Err(e) => {
213                            error!("Graceful shutdown broadcast failed: {e}");
214                            // switch to fast shutdown
215                            return ShutdownType::Graceful;
216                        }
217                    }
218                    info!("Broadcast graceful shutdown complete");
219                    ShutdownType::Graceful
220                } else {
221                    info!("No socks to send, shutting down.");
222                    ShutdownType::Graceful
223                }
224            }
225        }
226    }
227
228    fn run_service(
229        mut service: Box<dyn Service>,
230        #[cfg(unix)] fds: Option<ListenFds>,
231        shutdown: ShutdownWatch,
232        threads: usize,
233        work_stealing: bool,
234        listeners_per_fd: usize,
235    ) -> Runtime
236// NOTE: we need to keep the runtime outside async since
237        // otherwise the runtime will be dropped.
238    {
239        let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
240        service_runtime.get_handle().spawn(async move {
241            service
242                .start_service(
243                    #[cfg(unix)]
244                    fds,
245                    shutdown,
246                    listeners_per_fd,
247                )
248                .await;
249            info!("service exited.")
250        });
251        service_runtime
252    }
253
254    #[cfg(unix)]
255    fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
256        let mut fds = Fds::new();
257        if upgrade {
258            debug!("Trying to receive socks");
259            fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
260        }
261        self.listen_fds = Some(Arc::new(Mutex::new(fds)));
262        Ok(())
263    }
264
265    /// Create a new [`Server`], using the [`Opt`] and [`ServerConf`] values provided
266    ///
267    /// This method is intended for pingora frontends that are NOT using the built-in
268    /// command line and configuration file parsing, and are instead using their own.
269    ///
270    /// If a configuration file path is provided as part of `opt`, it will be ignored
271    /// and a warning will be logged.
272    pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
273        let opt = raw_opt.into();
274        if let Some(opts) = &opt {
275            if let Some(c) = opts.conf.as_ref() {
276                warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
277            }
278            conf.merge_with_opt(opts);
279        }
280
281        let (tx, rx) = watch::channel(false);
282
283        Server {
284            services: vec![],
285            #[cfg(unix)]
286            listen_fds: None,
287            shutdown_watch: tx,
288            shutdown_recv: rx,
289            configuration: Arc::new(conf),
290            options: opt,
291            #[cfg(feature = "sentry")]
292            sentry: None,
293        }
294    }
295
296    /// Create a new [`Server`].
297    ///
298    /// Only one [`Server`] needs to be created for a process. A [`Server`] can hold multiple
299    /// independent services.
300    ///
301    /// Command line options can either be passed by parsing the command line arguments via
302    /// `Opt::parse_args()`, or be generated by other means.
303    pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
304        let opt = opt.into();
305        let (tx, rx) = watch::channel(false);
306
307        let conf = if let Some(opt) = opt.as_ref() {
308            opt.conf.as_ref().map_or_else(
309                || {
310                    // options, no conf, generated
311                    ServerConf::new_with_opt_override(opt).ok_or_else(|| {
312                        Error::explain(ErrorType::ReadError, "Conf generation failed")
313                    })
314                },
315                |_| {
316                    // options and conf loaded
317                    ServerConf::load_yaml_with_opt_override(opt)
318                },
319            )
320        } else {
321            ServerConf::new()
322                .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
323        }?;
324
325        Ok(Server {
326            services: vec![],
327            #[cfg(unix)]
328            listen_fds: None,
329            shutdown_watch: tx,
330            shutdown_recv: rx,
331            configuration: Arc::new(conf),
332            options: opt,
333            #[cfg(feature = "sentry")]
334            sentry: None,
335        })
336    }
337
338    /// Add a service to this server.
339    ///
340    /// A service is anything that implements [`Service`].
341    pub fn add_service(&mut self, service: impl Service + 'static) {
342        self.services.push(Box::new(service));
343    }
344
345    /// Similar to [`Self::add_service()`], but take a list of services
346    pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
347        self.services.extend(services);
348    }
349
350    /// Prepare the server to start
351    ///
352    /// When trying to zero downtime upgrade from an older version of the server which is already
353    /// running, this function will try to get all its listening sockets in order to take them over.
354    pub fn bootstrap(&mut self) {
355        info!("Bootstrap starting");
356        debug!("{:#?}", self.options);
357
358        /* only init sentry in release builds */
359        #[cfg(all(not(debug_assertions), feature = "sentry"))]
360        let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
361
362        if self.options.as_ref().is_some_and(|o| o.test) {
363            info!("Server Test passed, exiting");
364            std::process::exit(0);
365        }
366
367        // load fds
368        #[cfg(unix)]
369        match self.load_fds(self.options.as_ref().is_some_and(|o| o.upgrade)) {
370            Ok(_) => {
371                info!("Bootstrap done");
372            }
373            Err(e) => {
374                // sentry log error on fd load failure
375                #[cfg(all(not(debug_assertions), feature = "sentry"))]
376                sentry::capture_error(&e);
377
378                error!("Bootstrap failed on error: {:?}, exiting.", e);
379                std::process::exit(1);
380            }
381        }
382    }
383
384    /// Start the server using [Self::run] and default [RunArgs].
385    pub fn run_forever(self) -> ! {
386        info!("Server starting");
387
388        self.run(RunArgs::default());
389
390        info!("All runtimes exited, exiting now");
391        std::process::exit(0)
392    }
393
394    /// Start the server
395    ///
396    /// This function will block forever until the server needs to quit. So this would be the last
397    /// function to call for this object.
398    ///
399    /// Note: this function may fork the process for daemonization, so any additional threads created
400    /// before this function will be lost to any service logic once this function is called.
401    pub fn run(mut self, run_args: RunArgs) {
402        info!("Server starting");
403
404        let conf = self.configuration.as_ref();
405
406        #[cfg(unix)]
407        if conf.daemon {
408            info!("Daemonizing the server");
409            fast_timeout::pause_for_fork();
410            daemonize(&self.configuration);
411            fast_timeout::unpause();
412        }
413
414        #[cfg(windows)]
415        if conf.daemon {
416            panic!("Daemonizing under windows is not supported");
417        }
418
419        /* only init sentry in release builds */
420        #[cfg(all(not(debug_assertions), feature = "sentry"))]
421        let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
422
423        let mut runtimes: Vec<Runtime> = Vec::new();
424
425        while let Some(service) = self.services.pop() {
426            let threads = service.threads().unwrap_or(conf.threads);
427            let runtime = Server::run_service(
428                service,
429                #[cfg(unix)]
430                self.listen_fds.clone(),
431                self.shutdown_recv.clone(),
432                threads,
433                conf.work_stealing,
434                self.configuration.listener_tasks_per_fd,
435            );
436            runtimes.push(runtime);
437        }
438
439        // blocked on main loop so that it runs forever
440        // Only work steal runtime can use block_on()
441        let server_runtime = Server::create_runtime("Server", 1, true);
442        #[cfg(unix)]
443        let shutdown_type = server_runtime
444            .get_handle()
445            .block_on(self.main_loop(run_args));
446        #[cfg(windows)]
447        let shutdown_type = ShutdownType::Graceful;
448
449        if matches!(shutdown_type, ShutdownType::Graceful) {
450            let exit_timeout = self
451                .configuration
452                .as_ref()
453                .grace_period_seconds
454                .unwrap_or(EXIT_TIMEOUT);
455            info!("Graceful shutdown: grace period {}s starts", exit_timeout);
456            thread::sleep(Duration::from_secs(exit_timeout));
457            info!("Graceful shutdown: grace period ends");
458        }
459
460        // Give tokio runtimes time to exit
461        let shutdown_timeout = match shutdown_type {
462            ShutdownType::Quick => Duration::from_secs(0),
463            ShutdownType::Graceful => Duration::from_secs(
464                self.configuration
465                    .as_ref()
466                    .graceful_shutdown_timeout_seconds
467                    .unwrap_or(5),
468            ),
469        };
470        let shutdowns: Vec<_> = runtimes
471            .into_iter()
472            .map(|rt| {
473                info!("Waiting for runtimes to exit!");
474                thread::spawn(move || {
475                    rt.shutdown_timeout(shutdown_timeout);
476                    thread::sleep(shutdown_timeout)
477                })
478            })
479            .collect();
480        for shutdown in shutdowns {
481            if let Err(e) = shutdown.join() {
482                error!("Failed to shutdown runtime: {:?}", e);
483            }
484        }
485    }
486
487    fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
488        if work_steal {
489            Runtime::new_steal(threads, name)
490        } else {
491            Runtime::new_no_steal(threads, name)
492        }
493    }
494}