pingora_core/server/
mod.rs

1// Copyright 2025 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Server process and configuration management
16
17pub mod configuration;
18#[cfg(unix)]
19mod daemon;
20#[cfg(unix)]
21pub(crate) mod transfer_fd;
22
23use async_trait::async_trait;
24#[cfg(unix)]
25use daemon::daemonize;
26use log::{debug, error, info, warn};
27use pingora_runtime::Runtime;
28use pingora_timeout::fast_timeout;
29#[cfg(feature = "sentry")]
30use sentry::ClientOptions;
31use std::sync::Arc;
32use std::thread;
33#[cfg(unix)]
34use tokio::signal::unix;
35use tokio::sync::{broadcast, watch, Mutex};
36use tokio::time::{sleep, Duration};
37
38use crate::services::Service;
39use configuration::{Opt, ServerConf};
40#[cfg(unix)]
41pub use transfer_fd::Fds;
42
43use pingora_error::{Error, ErrorType, Result};
44
45/* Time to wait before exiting the program.
46This is the graceful period for all existing sessions to finish */
47const EXIT_TIMEOUT: u64 = 60 * 5;
48/* Time to wait before shutting down listening sockets.
49This is the graceful period for the new service to get ready */
50const CLOSE_TIMEOUT: u64 = 5;
51
52enum ShutdownType {
53    Graceful,
54    Quick,
55}
56
57/// The execution phase the server is currently in.
58#[derive(Clone, Debug)]
59#[non_exhaustive]
60pub enum ExecutionPhase {
61    /// The server was created, but has not started yet.
62    Setup,
63
64    /// Services are being prepared.
65    ///
66    /// During graceful upgrades this phase acquires the listening FDs from the old process.
67    Bootstrap,
68
69    /// Bootstrap has finished, listening FDs have been transferred.
70    BootstrapComplete,
71
72    /// The server is running and is listening for shutdown signals.
73    Running,
74
75    /// A QUIT signal was received, indicating that a new process wants to take over.
76    ///
77    /// The server is trying to send the fds to the new process over a Unix socket.
78    GracefulUpgradeTransferringFds,
79
80    /// FDs have been sent to the new process.
81    /// Waiting a fixed amount of time to allow the new process to take the sockets.
82    GracefulUpgradeCloseTimeout,
83
84    /// A TERM signal was received, indicating that the server should shut down gracefully.
85    GracefulTerminate,
86
87    /// The server is shutting down.
88    ShutdownStarted,
89
90    /// Waiting for the configured grace period to end before shutting down.
91    ShutdownGracePeriod,
92
93    /// Wait for runtimes to finish.
94    ShutdownRuntimes,
95
96    /// The server has stopped.
97    Terminated,
98}
99
100/// The receiver for server's shutdown event. The value will turn to true once the server starts
101/// to shutdown
102pub type ShutdownWatch = watch::Receiver<bool>;
103#[cfg(unix)]
104pub type ListenFds = Arc<Mutex<Fds>>;
105
106/// The type of shutdown process that has been requested.
107#[derive(Debug)]
108pub enum ShutdownSignal {
109    /// Send file descriptors to the new process before starting runtime shutdown with
110    /// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
111    GracefulUpgrade,
112    /// Wait for [ServerConf::grace_period_seconds] before starting runtime shutdown with
113    /// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
114    GracefulTerminate,
115    /// Shutdown with no timeout for runtime shutdown.
116    FastShutdown,
117}
118
119/// Watcher of a shutdown signal, e.g., [UnixShutdownSignalWatch] for Unix-like
120/// platforms.
121#[async_trait]
122pub trait ShutdownSignalWatch {
123    /// Returns the desired shutdown type once one has been requested.
124    async fn recv(&self) -> ShutdownSignal;
125}
126
127/// A Unix shutdown watcher that awaits for Unix signals.
128///
129/// - `SIGQUIT`: graceful upgrade
130/// - `SIGTERM`: graceful terminate
131/// - `SIGINT`: fast shutdown
132#[cfg(unix)]
133pub struct UnixShutdownSignalWatch;
134
135#[cfg(unix)]
136#[async_trait]
137impl ShutdownSignalWatch for UnixShutdownSignalWatch {
138    async fn recv(&self) -> ShutdownSignal {
139        let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
140        let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
141        let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
142
143        tokio::select! {
144            _ = graceful_upgrade_signal.recv() => {
145                ShutdownSignal::GracefulUpgrade
146            },
147            _ = graceful_terminate_signal.recv() => {
148                ShutdownSignal::GracefulTerminate
149            },
150            _ = fast_shutdown_signal.recv() => {
151                ShutdownSignal::FastShutdown
152            },
153        }
154    }
155}
156
157/// Arguments to configure running of the pingora server.
158pub struct RunArgs {
159    /// Signal for initating shutdown
160    #[cfg(unix)]
161    pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
162}
163
164impl Default for RunArgs {
165    #[cfg(unix)]
166    fn default() -> Self {
167        Self {
168            shutdown_signal: Box::new(UnixShutdownSignalWatch),
169        }
170    }
171
172    #[cfg(windows)]
173    fn default() -> Self {
174        Self {}
175    }
176}
177
178/// The server object
179///
180/// This object represents an entire pingora server process which may have multiple independent
181/// services (see [crate::services]). The server object handles signals, reading configuration,
182/// zero downtime upgrade and error reporting.
183pub struct Server {
184    services: Vec<Box<dyn Service>>,
185    #[cfg(unix)]
186    listen_fds: Option<ListenFds>,
187    shutdown_watch: watch::Sender<bool>,
188    // TODO: we many want to drop this copy to let sender call closed()
189    shutdown_recv: ShutdownWatch,
190
191    /// Tracks the execution phase of the server during upgrades and graceful shutdowns.
192    ///
193    /// Users can subscribe to the phase with [`Self::watch_execution_phase()`].
194    execution_phase_watch: broadcast::Sender<ExecutionPhase>,
195
196    /// The parsed server configuration
197    pub configuration: Arc<ServerConf>,
198    /// The parser command line options
199    pub options: Option<Opt>,
200    #[cfg(feature = "sentry")]
201    #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
202    /// The Sentry ClientOptions.
203    ///
204    /// Panics and other events sentry captures will be sent to this DSN **only in release mode**
205    pub sentry: Option<ClientOptions>,
206}
207
208// TODO: delete the pid when exit
209
210impl Server {
211    /// Acquire a receiver for the server's execution phase.
212    ///
213    /// The receiver will produce values for each transition.
214    pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
215        self.execution_phase_watch.subscribe()
216    }
217
218    #[cfg(unix)]
219    async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
220        // waiting for exit signal
221
222        self.execution_phase_watch
223            .send(ExecutionPhase::Running)
224            .ok();
225
226        match run_args.shutdown_signal.recv().await {
227            ShutdownSignal::FastShutdown => {
228                info!("SIGINT received, exiting");
229                ShutdownType::Quick
230            }
231            ShutdownSignal::GracefulTerminate => {
232                // we receive a graceful terminate, all instances are instructed to stop
233                info!("SIGTERM received, gracefully exiting");
234                // graceful shutdown if there are listening sockets
235                info!("Broadcasting graceful shutdown");
236                match self.shutdown_watch.send(true) {
237                    Ok(_) => {
238                        info!("Graceful shutdown started!");
239                    }
240                    Err(e) => {
241                        error!("Graceful shutdown broadcast failed: {e}");
242                    }
243                }
244                info!("Broadcast graceful shutdown complete");
245
246                self.execution_phase_watch
247                    .send(ExecutionPhase::GracefulTerminate)
248                    .ok();
249
250                ShutdownType::Graceful
251            }
252            ShutdownSignal::GracefulUpgrade => {
253                // TODO: still need to select! on signals in case a fast shutdown is needed
254                // aka: move below to another task and only kick it off here
255                info!("SIGQUIT received, sending socks and gracefully exiting");
256
257                self.execution_phase_watch
258                    .send(ExecutionPhase::GracefulUpgradeTransferringFds)
259                    .ok();
260
261                if let Some(fds) = &self.listen_fds {
262                    let fds = fds.lock().await;
263                    info!("Trying to send socks");
264                    // XXX: this is blocking IO
265                    match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
266                        Ok(_) => {
267                            info!("listener sockets sent");
268                        }
269                        Err(e) => {
270                            error!("Unable to send listener sockets to new process: {e}");
271                            // sentry log error on fd send failure
272                            #[cfg(all(not(debug_assertions), feature = "sentry"))]
273                            sentry::capture_error(&e);
274                        }
275                    }
276                    self.execution_phase_watch
277                        .send(ExecutionPhase::GracefulUpgradeCloseTimeout)
278                        .ok();
279                    sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
280                    info!("Broadcasting graceful shutdown");
281                    // gracefully exiting
282                    match self.shutdown_watch.send(true) {
283                        Ok(_) => {
284                            info!("Graceful shutdown started!");
285                        }
286                        Err(e) => {
287                            error!("Graceful shutdown broadcast failed: {e}");
288                            // switch to fast shutdown
289                            return ShutdownType::Graceful;
290                        }
291                    }
292                    info!("Broadcast graceful shutdown complete");
293                    ShutdownType::Graceful
294                } else {
295                    info!("No socks to send, shutting down.");
296                    ShutdownType::Graceful
297                }
298            }
299        }
300    }
301
302    fn run_service(
303        mut service: Box<dyn Service>,
304        #[cfg(unix)] fds: Option<ListenFds>,
305        shutdown: ShutdownWatch,
306        threads: usize,
307        work_stealing: bool,
308        listeners_per_fd: usize,
309    ) -> Runtime
310// NOTE: we need to keep the runtime outside async since
311        // otherwise the runtime will be dropped.
312    {
313        let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
314        service_runtime.get_handle().spawn(async move {
315            service
316                .start_service(
317                    #[cfg(unix)]
318                    fds,
319                    shutdown,
320                    listeners_per_fd,
321                )
322                .await;
323            info!("service exited.")
324        });
325        service_runtime
326    }
327
328    #[cfg(unix)]
329    fn load_fds(&mut self, upgrade: bool) -> Result<(), nix::Error> {
330        let mut fds = Fds::new();
331        if upgrade {
332            debug!("Trying to receive socks");
333            fds.get_from_sock(self.configuration.as_ref().upgrade_sock.as_str())?
334        }
335        self.listen_fds = Some(Arc::new(Mutex::new(fds)));
336        Ok(())
337    }
338
339    /// Create a new [`Server`], using the [`Opt`] and [`ServerConf`] values provided
340    ///
341    /// This method is intended for pingora frontends that are NOT using the built-in
342    /// command line and configuration file parsing, and are instead using their own.
343    ///
344    /// If a configuration file path is provided as part of `opt`, it will be ignored
345    /// and a warning will be logged.
346    pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
347        let opt = raw_opt.into();
348        if let Some(opts) = &opt {
349            if let Some(c) = opts.conf.as_ref() {
350                warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
351            }
352            conf.merge_with_opt(opts);
353        }
354
355        let (tx, rx) = watch::channel(false);
356
357        Server {
358            services: vec![],
359            #[cfg(unix)]
360            listen_fds: None,
361            shutdown_watch: tx,
362            shutdown_recv: rx,
363            execution_phase_watch: broadcast::channel(100).0,
364            configuration: Arc::new(conf),
365            options: opt,
366            #[cfg(feature = "sentry")]
367            sentry: None,
368        }
369    }
370
371    /// Create a new [`Server`].
372    ///
373    /// Only one [`Server`] needs to be created for a process. A [`Server`] can hold multiple
374    /// independent services.
375    ///
376    /// Command line options can either be passed by parsing the command line arguments via
377    /// `Opt::parse_args()`, or be generated by other means.
378    pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
379        let opt = opt.into();
380        let (tx, rx) = watch::channel(false);
381
382        let conf = if let Some(opt) = opt.as_ref() {
383            opt.conf.as_ref().map_or_else(
384                || {
385                    // options, no conf, generated
386                    ServerConf::new_with_opt_override(opt).ok_or_else(|| {
387                        Error::explain(ErrorType::ReadError, "Conf generation failed")
388                    })
389                },
390                |_| {
391                    // options and conf loaded
392                    ServerConf::load_yaml_with_opt_override(opt)
393                },
394            )
395        } else {
396            ServerConf::new()
397                .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
398        }?;
399
400        Ok(Server {
401            services: vec![],
402            #[cfg(unix)]
403            listen_fds: None,
404            shutdown_watch: tx,
405            shutdown_recv: rx,
406            execution_phase_watch: broadcast::channel(100).0,
407            configuration: Arc::new(conf),
408            options: opt,
409            #[cfg(feature = "sentry")]
410            sentry: None,
411        })
412    }
413
414    /// Add a service to this server.
415    ///
416    /// A service is anything that implements [`Service`].
417    pub fn add_service(&mut self, service: impl Service + 'static) {
418        self.services.push(Box::new(service));
419    }
420
421    /// Similar to [`Self::add_service()`], but take a list of services
422    pub fn add_services(&mut self, services: Vec<Box<dyn Service>>) {
423        self.services.extend(services);
424    }
425
426    /// Prepare the server to start
427    ///
428    /// When trying to zero downtime upgrade from an older version of the server which is already
429    /// running, this function will try to get all its listening sockets in order to take them over.
430    pub fn bootstrap(&mut self) {
431        info!("Bootstrap starting");
432        debug!("{:#?}", self.options);
433
434        self.execution_phase_watch
435            .send(ExecutionPhase::Bootstrap)
436            .ok();
437
438        /* only init sentry in release builds */
439        #[cfg(all(not(debug_assertions), feature = "sentry"))]
440        let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
441
442        if self.options.as_ref().is_some_and(|o| o.test) {
443            info!("Server Test passed, exiting");
444            std::process::exit(0);
445        }
446
447        // load fds
448        #[cfg(unix)]
449        match self.load_fds(self.options.as_ref().is_some_and(|o| o.upgrade)) {
450            Ok(_) => {
451                info!("Bootstrap done");
452            }
453            Err(e) => {
454                // sentry log error on fd load failure
455                #[cfg(all(not(debug_assertions), feature = "sentry"))]
456                sentry::capture_error(&e);
457
458                error!("Bootstrap failed on error: {:?}, exiting.", e);
459                std::process::exit(1);
460            }
461        }
462
463        self.execution_phase_watch
464            .send(ExecutionPhase::BootstrapComplete)
465            .ok();
466    }
467
468    /// Start the server using [Self::run] and default [RunArgs].
469    ///
470    /// This function will block forever until the server needs to quit. So this would be the last
471    /// function to call for this object.
472    ///
473    /// Note: this function may fork the process for daemonization, so any additional threads created
474    /// before this function will be lost to any service logic once this function is called.
475    pub fn run_forever(self) -> ! {
476        info!("Server starting");
477
478        self.run(RunArgs::default());
479
480        info!("All runtimes exited, exiting now");
481        std::process::exit(0)
482    }
483
484    /// Run the server until execution finished.
485    ///
486    /// This function will run until the server has been instructed to shut down
487    /// through a signal, and will then wait for all services to finish and
488    /// runtimes to exit.
489    ///
490    /// Note: if daemonization is enabled in the config, this function will
491    /// never return.
492    /// Instead it will either start the daemon process and exit, or panic
493    /// if daemonization fails.
494    pub fn run(mut self, run_args: RunArgs) {
495        info!("Server starting");
496
497        let conf = self.configuration.as_ref();
498
499        #[cfg(unix)]
500        if conf.daemon {
501            info!("Daemonizing the server");
502            fast_timeout::pause_for_fork();
503            daemonize(&self.configuration);
504            fast_timeout::unpause();
505        }
506
507        #[cfg(windows)]
508        if conf.daemon {
509            panic!("Daemonizing under windows is not supported");
510        }
511
512        /* only init sentry in release builds */
513        #[cfg(all(not(debug_assertions), feature = "sentry"))]
514        let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
515
516        // Holds tuples of runtimes and their service name.
517        let mut runtimes: Vec<(Runtime, String)> = Vec::new();
518
519        while let Some(service) = self.services.pop() {
520            let threads = service.threads().unwrap_or(conf.threads);
521            let name = service.name().to_string();
522            let runtime = Server::run_service(
523                service,
524                #[cfg(unix)]
525                self.listen_fds.clone(),
526                self.shutdown_recv.clone(),
527                threads,
528                conf.work_stealing,
529                self.configuration.listener_tasks_per_fd,
530            );
531            runtimes.push((runtime, name));
532        }
533
534        // blocked on main loop so that it runs forever
535        // Only work steal runtime can use block_on()
536        let server_runtime = Server::create_runtime("Server", 1, true);
537        #[cfg(unix)]
538        let shutdown_type = server_runtime
539            .get_handle()
540            .block_on(self.main_loop(run_args));
541        #[cfg(windows)]
542        let shutdown_type = ShutdownType::Graceful;
543
544        self.execution_phase_watch
545            .send(ExecutionPhase::ShutdownStarted)
546            .ok();
547
548        if matches!(shutdown_type, ShutdownType::Graceful) {
549            self.execution_phase_watch
550                .send(ExecutionPhase::ShutdownGracePeriod)
551                .ok();
552
553            let exit_timeout = self
554                .configuration
555                .as_ref()
556                .grace_period_seconds
557                .unwrap_or(EXIT_TIMEOUT);
558            info!("Graceful shutdown: grace period {}s starts", exit_timeout);
559            thread::sleep(Duration::from_secs(exit_timeout));
560            info!("Graceful shutdown: grace period ends");
561        }
562
563        // Give tokio runtimes time to exit
564        let shutdown_timeout = match shutdown_type {
565            ShutdownType::Quick => Duration::from_secs(0),
566            ShutdownType::Graceful => Duration::from_secs(
567                self.configuration
568                    .as_ref()
569                    .graceful_shutdown_timeout_seconds
570                    .unwrap_or(5),
571            ),
572        };
573
574        self.execution_phase_watch
575            .send(ExecutionPhase::ShutdownRuntimes)
576            .ok();
577
578        let shutdowns: Vec<_> = runtimes
579            .into_iter()
580            .map(|(rt, name)| {
581                info!("Waiting for runtimes to exit!");
582                let join = thread::spawn(move || {
583                    rt.shutdown_timeout(shutdown_timeout);
584                    thread::sleep(shutdown_timeout)
585                });
586                (join, name)
587            })
588            .collect();
589        for (shutdown, name) in shutdowns {
590            info!("Waiting for service runtime {} to exit", name);
591            if let Err(e) = shutdown.join() {
592                error!("Failed to shutdown service runtime {}: {:?}", name, e);
593            }
594            debug!("Service runtime {} has exited", name);
595        }
596        info!("All runtimes exited, exiting now");
597
598        self.execution_phase_watch
599            .send(ExecutionPhase::Terminated)
600            .ok();
601    }
602
603    fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
604        if work_steal {
605            Runtime::new_steal(threads, name)
606        } else {
607            Runtime::new_no_steal(threads, name)
608        }
609    }
610}