Skip to main content

pingora_core/server/
mod.rs

1// Copyright 2026 Cloudflare, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Server process and configuration management
16
17mod bootstrap_services;
18pub mod configuration;
19#[cfg(unix)]
20mod daemon;
21#[cfg(unix)]
22pub(crate) mod transfer_fd;
23
24use async_trait::async_trait;
25#[cfg(unix)]
26use daemon::daemonize;
27use daggy::NodeIndex;
28use log::{debug, error, info, warn};
29use parking_lot::Mutex;
30use pingora_runtime::Runtime;
31use pingora_timeout::fast_timeout;
32#[cfg(feature = "sentry")]
33use sentry::ClientOptions;
34use std::sync::Arc;
35use std::thread;
36use std::time::SystemTime;
37#[cfg(unix)]
38use tokio::signal::unix;
39use tokio::sync::{broadcast, watch, Mutex as TokioMutex};
40use tokio::time::{sleep, Duration};
41
42use crate::prelude::background_service;
43use crate::server::bootstrap_services::{Bootstrap, BootstrapService, SentryInitService};
44use crate::services::{
45    DependencyGraph, ServiceHandle, ServiceReadyNotifier, ServiceReadyWatch, ServiceWithDependents,
46};
47use configuration::{Opt, ServerConf};
48use std::collections::HashMap;
49#[cfg(unix)]
50pub use transfer_fd::Fds;
51
52use pingora_error::{Error, ErrorType, Result};
53
54/* Time to wait before exiting the program.
55This is the graceful period for all existing sessions to finish */
56const EXIT_TIMEOUT: u64 = 60 * 5;
57/* Time to wait before shutting down listening sockets.
58This is the graceful period for the new service to get ready */
59const CLOSE_TIMEOUT: u64 = 5;
60
61enum ShutdownType {
62    Graceful,
63    Quick,
64}
65
66/// Internal wrapper for services with dependency metadata.
67pub(crate) struct ServiceWrapper {
68    ready_notifier: Option<ServiceReadyNotifier>,
69    service: Box<dyn ServiceWithDependents>,
70    service_handle: ServiceHandle,
71}
72
73/// The execution phase the server is currently in.
74#[derive(Clone, Debug)]
75#[non_exhaustive]
76pub enum ExecutionPhase {
77    /// The server was created, but has not started yet.
78    Setup,
79
80    /// Services are being prepared.
81    ///
82    /// During graceful upgrades this phase acquires the listening FDs from the old process.
83    Bootstrap,
84
85    /// Bootstrap has finished, listening FDs have been transferred.
86    BootstrapComplete,
87
88    /// The server is running and is listening for shutdown signals.
89    Running,
90
91    /// A QUIT signal was received, indicating that a new process wants to take over.
92    ///
93    /// The server is trying to send the fds to the new process over a Unix socket.
94    GracefulUpgradeTransferringFds,
95
96    /// FDs have been sent to the new process.
97    /// Waiting a fixed amount of time to allow the new process to take the sockets.
98    GracefulUpgradeCloseTimeout,
99
100    /// A TERM signal was received, indicating that the server should shut down gracefully.
101    GracefulTerminate,
102
103    /// The server is shutting down.
104    ShutdownStarted,
105
106    /// Waiting for the configured grace period to end before shutting down.
107    ShutdownGracePeriod,
108
109    /// Wait for runtimes to finish.
110    ShutdownRuntimes,
111
112    /// The server has stopped.
113    Terminated,
114}
115
116/// The receiver for server's shutdown event. The value will turn to true once the server starts
117/// to shutdown
118pub type ShutdownWatch = watch::Receiver<bool>;
119#[cfg(unix)]
120pub type ListenFds = Arc<TokioMutex<Fds>>;
121
122/// The type of shutdown process that has been requested.
123#[derive(Debug)]
124pub enum ShutdownSignal {
125    /// Send file descriptors to the new process before starting runtime shutdown with
126    /// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
127    GracefulUpgrade,
128    /// Wait for [ServerConf::grace_period_seconds] before starting runtime shutdown with
129    /// [ServerConf::graceful_shutdown_timeout_seconds] timeout.
130    GracefulTerminate,
131    /// Shutdown with no timeout for runtime shutdown.
132    FastShutdown,
133}
134
135/// Watcher of a shutdown signal, e.g., [UnixShutdownSignalWatch] for Unix-like
136/// platforms.
137#[async_trait]
138pub trait ShutdownSignalWatch {
139    /// Returns the desired shutdown type once one has been requested.
140    async fn recv(&self) -> ShutdownSignal;
141}
142
143/// A Unix shutdown watcher that awaits for Unix signals.
144///
145/// - `SIGQUIT`: graceful upgrade
146/// - `SIGTERM`: graceful terminate
147/// - `SIGINT`: fast shutdown
148#[cfg(unix)]
149pub struct UnixShutdownSignalWatch;
150
151#[cfg(unix)]
152#[async_trait]
153impl ShutdownSignalWatch for UnixShutdownSignalWatch {
154    async fn recv(&self) -> ShutdownSignal {
155        let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
156        let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
157        let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
158
159        tokio::select! {
160            _ = graceful_upgrade_signal.recv() => {
161                ShutdownSignal::GracefulUpgrade
162            },
163            _ = graceful_terminate_signal.recv() => {
164                ShutdownSignal::GracefulTerminate
165            },
166            _ = fast_shutdown_signal.recv() => {
167                ShutdownSignal::FastShutdown
168            },
169        }
170    }
171}
172
173/// Arguments to configure running of the pingora server.
174pub struct RunArgs {
175    /// Signal for initating shutdown
176    #[cfg(unix)]
177    pub shutdown_signal: Box<dyn ShutdownSignalWatch>,
178}
179
180impl Default for RunArgs {
181    #[cfg(unix)]
182    fn default() -> Self {
183        Self {
184            shutdown_signal: Box::new(UnixShutdownSignalWatch),
185        }
186    }
187
188    #[cfg(windows)]
189    fn default() -> Self {
190        Self {}
191    }
192}
193
194/// The server object
195///
196/// This object represents an entire pingora server process which may have multiple independent
197/// services (see [crate::services]). The server object handles signals, reading configuration,
198/// zero downtime upgrade and error reporting.
199pub struct Server {
200    // This is a way to add services that have to be run before any others
201    // without requiring dependencies to be set directly
202    init_services: Vec<Box<dyn ServiceWithDependents + 'static>>,
203
204    services: HashMap<NodeIndex, ServiceWrapper>,
205    shutdown_watch: watch::Sender<bool>,
206    // TODO: we many want to drop this copy to let sender call closed()
207    shutdown_recv: ShutdownWatch,
208
209    /// Tracks the execution phase of the server during upgrades and graceful shutdowns.
210    ///
211    /// Users can subscribe to the phase with [`Self::watch_execution_phase()`].
212    execution_phase_watch: broadcast::Sender<ExecutionPhase>,
213
214    /// Specification of service level dependencies
215    dependencies: Arc<Mutex<DependencyGraph>>,
216
217    /// Service initialization
218    bootstrap: Arc<Mutex<Bootstrap>>,
219
220    /// The parsed server configuration
221    pub configuration: Arc<ServerConf>,
222    /// The parser command line options
223    pub options: Option<Opt>,
224}
225
226// TODO: delete the pid when exit
227
228impl Server {
229    /// Acquire a receiver for the server's execution phase.
230    ///
231    /// The receiver will produce values for each transition.
232    pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
233        self.execution_phase_watch.subscribe()
234    }
235
236    #[cfg(unix)]
237    async fn main_loop(&self, run_args: RunArgs) -> ShutdownType {
238        // waiting for exit signal
239
240        self.execution_phase_watch
241            .send(ExecutionPhase::Running)
242            .ok();
243
244        match run_args.shutdown_signal.recv().await {
245            ShutdownSignal::FastShutdown => {
246                info!("SIGINT received, exiting");
247                ShutdownType::Quick
248            }
249            ShutdownSignal::GracefulTerminate => {
250                // we receive a graceful terminate, all instances are instructed to stop
251                info!("SIGTERM received, gracefully exiting");
252                // graceful shutdown if there are listening sockets
253                info!("Broadcasting graceful shutdown");
254                match self.shutdown_watch.send(true) {
255                    Ok(_) => {
256                        info!("Graceful shutdown started!");
257                    }
258                    Err(e) => {
259                        error!("Graceful shutdown broadcast failed: {e}");
260                    }
261                }
262                info!("Broadcast graceful shutdown complete");
263
264                self.execution_phase_watch
265                    .send(ExecutionPhase::GracefulTerminate)
266                    .ok();
267
268                ShutdownType::Graceful
269            }
270            ShutdownSignal::GracefulUpgrade => {
271                // TODO: still need to select! on signals in case a fast shutdown is needed
272                // aka: move below to another task and only kick it off here
273                info!("SIGQUIT received, sending socks and gracefully exiting");
274
275                self.execution_phase_watch
276                    .send(ExecutionPhase::GracefulUpgradeTransferringFds)
277                    .ok();
278
279                if let Some(fds) = self.listen_fds() {
280                    let fds = fds.lock().await;
281                    info!("Trying to send socks");
282                    // XXX: this is blocking IO
283                    match fds.send_to_sock(self.configuration.as_ref().upgrade_sock.as_str()) {
284                        Ok(_) => {
285                            info!("listener sockets sent");
286                        }
287                        Err(e) => {
288                            error!("Unable to send listener sockets to new process: {e}");
289                            // sentry log error on fd send failure
290                            #[cfg(all(not(debug_assertions), feature = "sentry"))]
291                            sentry::capture_error(&e);
292                        }
293                    }
294                    self.execution_phase_watch
295                        .send(ExecutionPhase::GracefulUpgradeCloseTimeout)
296                        .ok();
297                    sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
298                    info!("Broadcasting graceful shutdown");
299                    // gracefully exiting
300                    match self.shutdown_watch.send(true) {
301                        Ok(_) => {
302                            info!("Graceful shutdown started!");
303                        }
304                        Err(e) => {
305                            error!("Graceful shutdown broadcast failed: {e}");
306                            // switch to fast shutdown
307                            return ShutdownType::Graceful;
308                        }
309                    }
310                    info!("Broadcast graceful shutdown complete");
311                    ShutdownType::Graceful
312                } else {
313                    info!("No socks to send, shutting down.");
314                    ShutdownType::Graceful
315                }
316            }
317        }
318    }
319
320    #[cfg(windows)]
321    async fn main_loop(&self, _run_args: RunArgs) -> ShutdownType {
322        // waiting for exit signal
323
324        self.execution_phase_watch
325            .send(ExecutionPhase::Running)
326            .ok();
327
328        match tokio::signal::ctrl_c().await {
329            Ok(()) => {
330                info!("Ctrl+C received, gracefully exiting");
331                // graceful shutdown if there are listening sockets
332                info!("Broadcasting graceful shutdown");
333                match self.shutdown_watch.send(true) {
334                    Ok(_) => {
335                        info!("Graceful shutdown started!");
336                    }
337                    Err(e) => {
338                        error!("Graceful shutdown broadcast failed: {e}");
339                    }
340                }
341                info!("Broadcast graceful shutdown complete");
342
343                self.execution_phase_watch
344                    .send(ExecutionPhase::GracefulTerminate)
345                    .ok();
346
347                ShutdownType::Graceful
348            }
349            Err(e) => {
350                error!("Unable to listen for shutdown signal: {}", e);
351                ShutdownType::Quick
352            }
353        }
354    }
355
356    #[cfg(feature = "sentry")]
357    #[cfg_attr(docsrs, doc(cfg(feature = "sentry")))]
358    /// The Sentry ClientOptions.
359    ///
360    /// Panics and other events sentry captures will be sent to this DSN **only in release mode**
361    pub fn set_sentry_config(&mut self, sentry_config: ClientOptions) {
362        self.bootstrap.lock().set_sentry_config(Some(sentry_config));
363    }
364
365    /// Get the configured file descriptors for listening
366    #[cfg(unix)]
367    fn listen_fds(&self) -> Option<ListenFds> {
368        self.bootstrap.lock().get_fds()
369    }
370
371    #[allow(clippy::too_many_arguments)]
372    fn run_service(
373        mut service: Box<dyn ServiceWithDependents>,
374        #[cfg(unix)] fds: Option<ListenFds>,
375        shutdown: ShutdownWatch,
376        threads: usize,
377        work_stealing: bool,
378        listeners_per_fd: usize,
379        ready_notifier: ServiceReadyNotifier,
380        dependency_watches: Vec<ServiceReadyWatch>,
381    ) -> Runtime
382// NOTE: we need to keep the runtime outside async since
383        // otherwise the runtime will be dropped.
384    {
385        let service_runtime = Server::create_runtime(service.name(), threads, work_stealing);
386        let service_name = service.name().to_string();
387        service_runtime.get_handle().spawn(async move {
388            // Wait for all dependencies to be ready
389            let mut time_waited_opt: Option<Duration> = None;
390            for mut watch in dependency_watches {
391                let start = SystemTime::now();
392
393                if watch.wait_for(|&ready| ready).await.is_err() {
394                    error!(
395                        "Service '{}' dependency channel closed before ready",
396                        service_name
397                    );
398                }
399
400                *time_waited_opt.get_or_insert_default() += start.elapsed().unwrap_or_default()
401            }
402
403            if let Some(time_waited) = time_waited_opt {
404                service.on_startup_delay(time_waited);
405            }
406
407            // Start the actual service, passing the ready notifier
408            service
409                .start_service(
410                    #[cfg(unix)]
411                    fds,
412                    shutdown,
413                    listeners_per_fd,
414                    ready_notifier,
415                )
416                .await;
417            info!("service '{}' exited.", service_name);
418        });
419        service_runtime
420    }
421
422    /// Create a new [`Server`], using the [`Opt`] and [`ServerConf`] values provided
423    ///
424    /// This method is intended for pingora frontends that are NOT using the built-in
425    /// command line and configuration file parsing, and are instead using their own.
426    ///
427    /// If a configuration file path is provided as part of `opt`, it will be ignored
428    /// and a warning will be logged.
429    pub fn new_with_opt_and_conf(raw_opt: impl Into<Option<Opt>>, mut conf: ServerConf) -> Server {
430        let opt = raw_opt.into();
431        if let Some(opts) = &opt {
432            if let Some(c) = opts.conf.as_ref() {
433                warn!("Ignoring command line argument using '{c}' as configuration, and using provided configuration instead.");
434            }
435            conf.merge_with_opt(opts);
436        }
437
438        let (tx, rx) = watch::channel(false);
439
440        let execution_phase_watch = broadcast::channel(100).0;
441        let bootstrap = Arc::new(Mutex::new(Bootstrap::new(
442            &opt,
443            &conf,
444            &execution_phase_watch,
445        )));
446
447        Server {
448            services: Default::default(),
449            init_services: Default::default(),
450            shutdown_watch: tx,
451            shutdown_recv: rx,
452            execution_phase_watch,
453            configuration: Arc::new(conf),
454            options: opt,
455            dependencies: Arc::new(Mutex::new(DependencyGraph::new())),
456            bootstrap,
457        }
458    }
459
460    /// Create a new [`Server`].
461    ///
462    /// Only one [`Server`] needs to be created for a process. A [`Server`] can hold multiple
463    /// independent services.
464    ///
465    /// Command line options can either be passed by parsing the command line arguments via
466    /// `Opt::parse_args()`, or be generated by other means.
467    pub fn new(opt: impl Into<Option<Opt>>) -> Result<Server> {
468        let opt = opt.into();
469        let (tx, rx) = watch::channel(false);
470
471        let execution_phase_watch = broadcast::channel(100).0;
472        let conf = if let Some(opt) = opt.as_ref() {
473            opt.conf.as_ref().map_or_else(
474                || {
475                    // options, no conf, generated
476                    ServerConf::new_with_opt_override(opt).ok_or_else(|| {
477                        Error::explain(ErrorType::ReadError, "Conf generation failed")
478                    })
479                },
480                |_| {
481                    // options and conf loaded
482                    ServerConf::load_yaml_with_opt_override(opt)
483                },
484            )
485        } else {
486            ServerConf::new()
487                .ok_or_else(|| Error::explain(ErrorType::ReadError, "Conf generation failed"))
488        }?;
489
490        let bootstrap = Arc::new(Mutex::new(Bootstrap::new(
491            &opt,
492            &conf,
493            &execution_phase_watch,
494        )));
495
496        Ok(Server {
497            services: Default::default(),
498            init_services: Default::default(),
499            shutdown_watch: tx,
500            shutdown_recv: rx,
501            execution_phase_watch,
502            configuration: Arc::new(conf),
503            options: opt,
504            dependencies: Arc::new(Mutex::new(DependencyGraph::new())),
505            bootstrap,
506        })
507    }
508
509    /// Add a service that all other services will wait on before starting.
510    fn add_init_service(&mut self, service: impl ServiceWithDependents + 'static) {
511        let boxed_service = Box::new(service);
512        self.init_services.push(boxed_service);
513    }
514
515    /// Add the init services as dependencies for all existing services
516    fn apply_init_service_dependencies(&mut self) {
517        let services = self
518            .services
519            .values()
520            .map(|service| service.service_handle.clone())
521            .collect::<Vec<_>>();
522        let global_deps = self
523            .init_services
524            .drain(..)
525            .collect::<Vec<_>>()
526            .into_iter()
527            .map(|dep| self.add_boxed_service(dep))
528            .collect::<Vec<_>>();
529        for service in services {
530            service.add_dependencies(&global_deps);
531        }
532    }
533
534    /// Add a service to this server.
535    ///
536    /// Returns a [`ServiceHandle`] that can be used to declare dependencies.
537    ///
538    /// # Example
539    ///
540    /// ```rust,ignore
541    /// let db_id = server.add_service(database_service);
542    /// let api_id = server.add_service(api_service);
543    ///
544    /// // Declare that API depends on database
545    /// api_id.add_dependency(&db_id);
546    /// ```
547    pub fn add_service(&mut self, service: impl ServiceWithDependents + 'static) -> ServiceHandle {
548        self.add_boxed_service(Box::new(service))
549    }
550
551    /// Add a pre-boxed service to this server.
552    ///
553    /// Returns a [`ServiceHandle`] that can be used to declare dependencies.
554    ///
555    /// # Example
556    ///
557    /// ```rust,ignore
558    /// let db_id = server.add_service(database_service);
559    /// let api_id = server.add_service(api_service);
560    ///
561    /// // Declare that API depends on database
562    /// api_id.add_dependency(&db_id);
563    /// ```
564    pub fn add_boxed_service(
565        &mut self,
566        service_box: Box<dyn ServiceWithDependents>,
567    ) -> ServiceHandle {
568        let name = service_box.name().to_string();
569
570        // Create a readiness notifier for this service
571        let (tx, rx) = watch::channel(false);
572
573        let id = self.dependencies.lock().add_node(name.clone(), rx.clone());
574
575        let service_handle = ServiceHandle::new(id, name, rx, &self.dependencies);
576
577        let wrapper = ServiceWrapper {
578            ready_notifier: Some(ServiceReadyNotifier::new(tx)),
579            service: service_box,
580            service_handle: service_handle.clone(),
581        };
582
583        self.services.insert(id, wrapper);
584
585        service_handle
586    }
587
588    /// Similar to [`Self::add_service()`], but take a list of services.
589    ///
590    /// Returns a `Vec<ServiceHandle>` for all added services.
591    pub fn add_services(
592        &mut self,
593        services: Vec<Box<dyn ServiceWithDependents>>,
594    ) -> Vec<ServiceHandle> {
595        services
596            .into_iter()
597            .map(|service| self.add_boxed_service(service))
598            .collect()
599    }
600
601    /// Prepare the server to start
602    ///
603    /// When trying to zero downtime upgrade from an older version of the server which is already
604    /// running, this function will try to get all its listening sockets in order to take them over.
605    pub fn bootstrap(&mut self) {
606        self.bootstrap.lock().bootstrap();
607    }
608
609    /// Create a service that will run to prepare the service to start
610    ///
611    /// The created service will handle the zero-downtime upgrade from an older version of the server
612    /// to this one. It will try to get all its listening sockets in order to take them over.
613    ///
614    /// Other bootstrapping functionality like sentry initialization will also be handled, but as a
615    /// service that will complete before any other service starts.
616    pub fn bootstrap_as_a_service(&mut self) -> ServiceHandle {
617        let bootstrap_service =
618            background_service("Bootstrap Service", BootstrapService::new(&self.bootstrap));
619
620        let sentry_service = background_service(
621            "Sentry Init Service",
622            SentryInitService::new(&self.bootstrap),
623        );
624
625        self.add_init_service(sentry_service);
626
627        self.add_service(bootstrap_service)
628    }
629
630    /// Start the server using [Self::run] and default [RunArgs].
631    ///
632    /// This function will block forever until the server needs to quit. So this would be the last
633    /// function to call for this object.
634    ///
635    /// Note: this function may fork the process for daemonization, so any additional threads created
636    /// before this function will be lost to any service logic once this function is called.
637    pub fn run_forever(self) -> ! {
638        self.run(RunArgs::default());
639
640        std::process::exit(0)
641    }
642
643    /// Run the server until execution finished.
644    ///
645    /// This function will run until the server has been instructed to shut down
646    /// through a signal, and will then wait for all services to finish and
647    /// runtimes to exit.
648    ///
649    /// Note: if daemonization is enabled in the config, this function will
650    /// never return.
651    /// Instead it will either start the daemon process and exit, or panic
652    /// if daemonization fails.
653    pub fn run(mut self, run_args: RunArgs) {
654        self.apply_init_service_dependencies();
655
656        info!("Server starting");
657
658        let conf = self.configuration.as_ref();
659
660        #[cfg(unix)]
661        if conf.daemon {
662            info!("Daemonizing the server");
663            fast_timeout::pause_for_fork();
664            daemonize(&self.configuration);
665            fast_timeout::unpause();
666        }
667
668        #[cfg(windows)]
669        if conf.daemon {
670            panic!("Daemonizing under windows is not supported");
671        }
672
673        // Holds tuples of runtimes and their service name.
674        let mut runtimes: Vec<(Runtime, String)> = Vec::new();
675
676        // Get services in topological order (dependencies first)
677        let startup_order = match self.dependencies.lock().topological_sort() {
678            Ok(order) => order,
679            Err(e) => {
680                error!("Failed to determine service startup order: {}", e);
681                std::process::exit(1);
682            }
683        };
684
685        // Log service names in startup order
686        let service_names: Vec<String> = startup_order
687            .iter()
688            .map(|(_, service)| service.name.clone())
689            .collect();
690        info!("Starting services in dependency order: {:?}", service_names);
691
692        // Start services in dependency order
693        for (service_id, service) in startup_order {
694            let mut wrapper = match self.services.remove(&service_id) {
695                Some(w) => w,
696                None => {
697                    warn!(
698                        "Service ID {:?}-{} in startup order but not found",
699                        service_id, service.name
700                    );
701                    continue;
702                }
703            };
704
705            let threads = wrapper.service.threads().unwrap_or(conf.threads);
706            let name = wrapper.service.name().to_string();
707
708            // Extract dependency watches from the ServiceHandle
709            let dependencies = self
710                .dependencies
711                .lock()
712                .get_dependencies(wrapper.service_handle.id);
713
714            // Get the readiness notifier for this service by taking it from the Option.
715            // Since service_id is the index, we can directly access it.
716            // We take() the notifier, leaving None in its place.
717            let ready_notifier = wrapper
718                .ready_notifier
719                .take()
720                .expect("Service notifier should exist");
721
722            if !dependencies.is_empty() {
723                info!(
724                    "Service '{name}' will wait for dependencies: {:?}",
725                    dependencies.iter().map(|s| &s.name).collect::<Vec<_>>()
726                );
727            } else {
728                info!("Starting service: {}", name);
729            }
730
731            let dependency_watches = dependencies
732                .iter()
733                .map(|s| s.ready_watch.clone())
734                .collect::<Vec<_>>();
735
736            let runtime = Server::run_service(
737                wrapper.service,
738                #[cfg(unix)]
739                self.listen_fds(),
740                self.shutdown_recv.clone(),
741                threads,
742                conf.work_stealing,
743                self.configuration.listener_tasks_per_fd,
744                ready_notifier,
745                dependency_watches,
746            );
747            runtimes.push((runtime, name));
748        }
749
750        // blocked on main loop so that it runs forever
751        // Only work steal runtime can use block_on()
752        let server_runtime = Server::create_runtime("Server", 1, true);
753        #[cfg(unix)]
754        let shutdown_type = server_runtime
755            .get_handle()
756            .block_on(self.main_loop(run_args));
757        #[cfg(windows)]
758        let shutdown_type = server_runtime
759            .get_handle()
760            .block_on(self.main_loop(run_args));
761
762        self.execution_phase_watch
763            .send(ExecutionPhase::ShutdownStarted)
764            .ok();
765
766        if matches!(shutdown_type, ShutdownType::Graceful) {
767            self.execution_phase_watch
768                .send(ExecutionPhase::ShutdownGracePeriod)
769                .ok();
770
771            let exit_timeout = self
772                .configuration
773                .as_ref()
774                .grace_period_seconds
775                .unwrap_or(EXIT_TIMEOUT);
776            info!("Graceful shutdown: grace period {}s starts", exit_timeout);
777            thread::sleep(Duration::from_secs(exit_timeout));
778            info!("Graceful shutdown: grace period ends");
779        }
780
781        // Give tokio runtimes time to exit
782        let shutdown_timeout = match shutdown_type {
783            ShutdownType::Quick => Duration::from_secs(0),
784            ShutdownType::Graceful => Duration::from_secs(
785                self.configuration
786                    .as_ref()
787                    .graceful_shutdown_timeout_seconds
788                    .unwrap_or(5),
789            ),
790        };
791
792        self.execution_phase_watch
793            .send(ExecutionPhase::ShutdownRuntimes)
794            .ok();
795
796        let shutdowns: Vec<_> = runtimes
797            .into_iter()
798            .map(|(rt, name)| {
799                info!("Waiting for runtimes to exit!");
800                let join = thread::spawn(move || {
801                    rt.shutdown_timeout(shutdown_timeout);
802                    thread::sleep(shutdown_timeout)
803                });
804                (join, name)
805            })
806            .collect();
807        for (shutdown, name) in shutdowns {
808            info!("Waiting for service runtime {} to exit", name);
809            if let Err(e) = shutdown.join() {
810                error!("Failed to shutdown service runtime {}: {:?}", name, e);
811            }
812            debug!("Service runtime {} has exited", name);
813        }
814        info!("All runtimes exited, exiting now");
815
816        self.execution_phase_watch
817            .send(ExecutionPhase::Terminated)
818            .ok();
819    }
820
821    fn create_runtime(name: &str, threads: usize, work_steal: bool) -> Runtime {
822        if work_steal {
823            Runtime::new_steal(threads, name)
824        } else {
825            Runtime::new_no_steal(threads, name)
826        }
827    }
828}