Skip to main content

qsu/
rt.rs

1//! Server application wrapper runtime.
2//!
3//! # Overview
4//! The _qsu_'s runtime lives in a layer between the actual server application
5//! and the "service subsystems" that servier applications can integrate
6//! against:
7//!
8//! <pre>
9//! +---------------------+
10//! |  Server Application |  (HTTP server, Matrix server, etc)
11//! +---------------------+
12//! |       qsu rt        |
13//! +---------------------+
14//! |  Platform service   |  (Windows Service subsystem, systemd, etc)
15//! |      subsystem      |
16//! +---------------------+
17//! </pre>
18//!
19//! The primary goal of the _qsu_ runtime is to provide a consistent interface
20//! that will be mapped to whatever service subsystem it is actually running
21//! on.  This including no special subsystem at all; such as when running the
22//! server application as a regular foreground process (which is common during
23//! development).
24//!
25//! # How service applications are implemented and used
26//! 1. Choose service application type.
27//!
28//!    _qsu_ needs to know what kind of runtime the server application expects.
29//!    Server applications pick the runtime type by implementing a trait, of
30//!    which there are currently three recognized types:
31//!    - [`ServiceHandler`] is used for "non-async" server applications.
32//!    - [`TokioServiceHandler`] is used for server applications that run under
33//!      the tokio executor.
34//!    - [`RocketServiceHandler`] is for server applications that are built on
35//!      top of the Rocket HTTP framework.
36//! 2. Fot the selected service handler type, implement the three methods:
37//!    - `init()` for initializing the service.
38//!    - `run()` is for running the actual server application.
39//!    - `shutdown()` is for shutting down the server application.
40//! 3. Once a service trait has been implemented, the application should create
41//!    a [`RunCtx`] object and call its [`run()`](RunCtx::run()) method,
42//!    passing in the service implementation object.
43//!
44//! ## Service Handler semantics
45//! When a handler is run through the [`RunCtx`] it will first call the
46//! handler's `init()` method.  If it returns `Ok()`, its `run()` method will
47//! be run.
48//!
49//! The handler's `shutdown()` will be called regardless of whether `init()` or
50//! `run()` was successful (the only precondition for `shutdown()` to be called
51//! is that `init()` was called).
52//!
53//! # Passing data to `init()` and `shutdown()`
54//! It's not uncommon that an application may have data that it wishes to pass
55//! to the service handler's `init()` or `shutdown()` handlers.  This can be
56//! accomplished by storing the data in the type that implements the service
57//! handler trait.  However, if there's a lot of data this can cause it to
58//! become cluttered with things that don't really belong there (they only use
59//! the type as a temporary transport mechanism).  As a way to avoid the
60//! clutter, the [`RunCtx`] can be handed types that can be picked up in
61//! `init()`/`shutdown()` (using `InitCtx` and `TermCtx`).
62//!
63//! Use [`RunCtx::init_passthrough()`]/[`RunCtx::init_passthrough_r()`] to pass
64//! data to the `init()` handler, and use
65//! [`RunCtx::term_passthrough()`]/[`RunCtx::term_passthrough_r()`] to pass
66//! data to the `shutdown()` handler.
67//!
68//! # Logging
69//! The runtime will implicitly initialize production logging and development
70//! logging.  To see how these can be configured, see
71//! [lumberjack](crate::lumberjack).
72//!
73//! # Argument parser
74//! _qsu_ offers an [argument parser](crate::argp::ArgParser), which can
75//! abstract away much of the runtime management and service registration.
76
77mod nosvc;
78mod rttype;
79mod signals;
80
81#[cfg(all(target_os = "linux", feature = "systemd"))]
82#[cfg_attr(docsrs, doc(cfg(feature = "systemd")))]
83mod systemd;
84
85#[cfg(windows)]
86pub mod winsvc;
87
88use std::{
89  any::{Any, TypeId},
90  sync::{
91    Arc, OnceLock,
92    atomic::{AtomicU32, Ordering}
93  }
94};
95
96use hashbrown::HashMap;
97
98#[cfg(any(feature = "tokio", feature = "rocket"))]
99use async_trait::async_trait;
100
101#[cfg(feature = "tokio")]
102use tokio::runtime;
103
104use tokio::sync::broadcast;
105
106#[cfg(all(target_os = "linux", feature = "systemd"))]
107use sd_notify::NotifyState;
108
109
110use crate::{err::CbErr, lumberjack::LumberJack};
111
112
113#[derive(Copy, Clone, Debug, PartialEq, Eq)]
114pub enum RunAs {
115  Foreground,
116  SvcSubsys
117}
118
119/// Keep track of whether the service application is running as foreground
120/// process or running under a service subsystem.
121static RUNAS: OnceLock<RunAs> = OnceLock::new();
122
123pub fn runas() -> Option<RunAs> {
124  RUNAS.get().copied()
125}
126
127
128/// The run time environment.
129///
130/// Can be used by the application callbacks to determine whether it is running
131/// as a service.
132#[derive(Debug, Clone)]
133pub enum RunEnv {
134  /// Running as a foreground process.
135  Foreground,
136
137  /// Running as a service.
138  ///
139  /// If the service subsystem has named services, this will contain the
140  /// service name.
141  Service(Option<String>)
142}
143
144
145/// Report the current startup/shutdown state to the platform service
146/// subsystem.
147pub(crate) trait StateReporter {
148  /// Inform the service subsystem that initialization is running.
149  fn starting(&self, checkpoint: u32, msg: &str);
150
151  /// Inform the service subsystem that initialization has completed.
152  fn started(&self);
153
154  /// Inform the service subsystem that termination is running.
155  fn stopping(&self, checkpoint: u32, msg: &str);
156
157  /// Inform the service subsystem that termination has completed.
158  fn stopped(&self);
159}
160
161
162/// A wrapper around a service state reporter backend implementation.
163#[derive(Clone)]
164#[repr(transparent)]
165pub(crate) struct ServiceReporter(Arc<dyn StateReporter + Send + Sync>);
166
167impl ServiceReporter {
168  pub(crate) fn new<SR>(sr: SR) -> Self
169  where
170    SR: StateReporter + Send + Sync + 'static
171  {
172    Self(Arc::new(sr))
173  }
174
175  pub(crate) fn starting(&self, checkpoint: u32, status: Option<&str>) {
176    let text = status.as_ref().map_or_else(
177      || format!("Starting[{checkpoint}]"),
178      |msg| format!("Starting[{checkpoint}] {msg}")
179    );
180    self.0.starting(checkpoint, &text);
181    log::debug!("{text}");
182  }
183
184  pub(crate) fn started(&self) {
185    self.0.started();
186    log::debug!(
187      "Service initialization has finished and is entering running state"
188    );
189  }
190
191  pub(crate) fn stopping(&self, checkpoint: u32, status: Option<&str>) {
192    let text = status.as_ref().map_or_else(
193      || format!("Stopping[{checkpoint}]"),
194      |msg| format!("Stopping[{checkpoint}] {msg}")
195    );
196    self.0.stopping(checkpoint, &text);
197    log::debug!("{text}");
198  }
199
200  pub(crate) fn stopped(&self) {
201    self.0.stopped();
202    log::debug!("Service termination has finished");
203  }
204}
205
206
207/// Context passed to `init()` service application callback.
208///
209/// This context can be used to query whether the service application is
210/// running as foreground process or running within a service subsystem.
211///
212/// It can also be used to report progress status back to the service
213/// subsystems, for platforms that support it.
214pub struct InitCtx {
215  re: RunEnv,
216  sr: ServiceReporter,
217  cnt: Arc<AtomicU32>,
218  passthrough: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
219  passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
220}
221
222impl InitCtx {
223  /// Return context used to identify whether service application is running as
224  /// a foreground process or a system service.
225  #[must_use]
226  pub fn runenv(&self) -> RunEnv {
227    self.re.clone()
228  }
229
230  /// Report startup state to the system service manager.
231  ///
232  /// For foreground processes and services that do not support startup state
233  /// notifications this method has no effect.
234  pub fn report(&self, status: Option<&str>) {
235    let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
236    if let Some(msg) = status {
237      tracing::trace!("Reached init checkpoint {checkpoint}; {}", msg);
238    } else {
239      tracing::trace!("Reached init checkpoint {checkpoint}");
240    }
241    self.sr.starting(checkpoint, status);
242  }
243
244  #[must_use]
245  pub fn get<T>(&self) -> Option<&T>
246  where
247    T: Send + 'static
248  {
249    self
250      .passthrough
251      .get(&TypeId::of::<T>())
252      .and_then(|boxed| boxed.downcast_ref::<T>())
253  }
254
255  /// Extract passthrough buffer.
256  pub fn take<T>(&mut self) -> Option<T>
257  where
258    T: Send + 'static
259  {
260    self
261      .passthrough
262      .remove(&TypeId::of::<T>())
263      .and_then(|boxed| boxed.downcast::<T>().ok())
264      .map(|v| *v)
265  }
266
267  pub fn term_passthrough<T>(&mut self, value: T)
268  where
269    T: Send + Sync + 'static
270  {
271    self
272      .passthrough_term
273      .insert(TypeId::of::<T>(), Box::new(value));
274  }
275}
276
277impl Drop for InitCtx {
278  fn drop(&mut self) {
279    let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
280    self
281      .sr
282      .starting(checkpoint, Some("Initialization phase finished"));
283  }
284}
285
286
287/// Context passed to `term()` service application callback.
288///
289/// This context can be used to query whether the service application is
290/// running as foreground process or running within a service subsystem.
291///
292/// It can also be used to report progress status back to the service
293/// subsystems, for platforms that support it.
294pub struct TermCtx {
295  re: RunEnv,
296  sr: ServiceReporter,
297  cnt: Arc<AtomicU32>,
298  passthrough: HashMap<TypeId, Box<dyn Any + Send + Sync>>
299}
300
301impl TermCtx {
302  /// Return context used to identify whether service application is running as
303  /// a foreground process or a system service.
304  #[must_use]
305  pub fn runenv(&self) -> RunEnv {
306    self.re.clone()
307  }
308
309  /// Report shutdown state to the system service manager.
310  ///
311  /// For foreground processes and services that do not support shutdown state
312  /// notifications this method has no effect.
313  pub fn report(&self, status: Option<&str>) {
314    let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
315    if let Some(msg) = status {
316      tracing::trace!("Reached term checkpoint {checkpoint}; {msg}");
317    } else {
318      tracing::trace!("Reached term checkpoint {checkpoint}");
319    }
320    self.sr.stopping(checkpoint, status);
321  }
322
323  #[must_use]
324  pub fn get<T: 'static>(&self) -> Option<&T> {
325    self
326      .passthrough
327      .get(&TypeId::of::<T>())
328      .and_then(|boxed| boxed.downcast_ref::<T>())
329  }
330
331  /// Extract passthrough buffer.
332  pub fn take<T: 'static>(&mut self) -> Option<T> {
333    self
334      .passthrough
335      .remove(&TypeId::of::<T>())
336      .and_then(|boxed| boxed.downcast::<T>().ok())
337      .map(|v| *v)
338  }
339}
340
341impl Drop for TermCtx {
342  fn drop(&mut self) {
343    let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
344    self
345      .sr
346      .stopping(checkpoint, Some("Termination phase finished"));
347  }
348}
349
350
351/// "Synchronous" (non-`async`) server application.
352///
353/// Implement this for an object that wraps a server application that does not
354/// use an async runtime.
355pub trait ServiceHandler {
356  type AppErr;
357
358  /// Implement to handle service application initialization.
359  ///
360  /// # Errors
361  /// Application-defined error returned from callback will be wrapped in
362  /// [`CbErr::App`] and returned to application.
363  fn init(&mut self, ictx: &mut InitCtx) -> Result<(), Self::AppErr>;
364
365  /// Implement to run service application.
366  ///
367  /// # Errors
368  /// Application-defined error returned from callback will be wrapped in
369  /// [`CbErr::App`] and returned to application.
370  fn run(&mut self, re: &RunEnv) -> Result<(), Self::AppErr>;
371
372  /// Implement to handle service application termination.
373  ///
374  /// # Errors
375  /// Application-defined error returned from callback will be wrapped in
376  /// [`CbErr::App`] and returned to application.
377  fn shutdown(&mut self, tctx: &mut TermCtx) -> Result<(), Self::AppErr>;
378}
379
380
381/// `async` server application built on the tokio runtime.
382///
383/// Implement this for an object that wraps a server application that uses
384/// tokio as an async runtime.
385#[cfg(feature = "tokio")]
386#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
387#[async_trait]
388pub trait TokioServiceHandler {
389  type AppErr;
390
391  async fn init(&mut self, ictx: &mut InitCtx) -> Result<(), Self::AppErr>;
392
393  async fn run(&mut self, re: &RunEnv) -> Result<(), Self::AppErr>;
394
395  async fn shutdown(&mut self, tctx: &mut TermCtx)
396  -> Result<(), Self::AppErr>;
397}
398
399
400/// Rocket server application handler.
401///
402/// While Rocket is built on top of tokio, it \[Rocket\] wants to initialize
403/// tokio itself.
404///
405/// There are two major ways to write Rocket services using qsu; either the
406/// application can let qsu be aware of the server applications' `Rocket`
407/// instances.  It does this by creating the `Rocket` instances in
408/// `RocketServiceHandler::init()` and returns them.  _qsu_ will ignite these
409/// rockets and pass them to `RocketServiceHandler::run()`.  The application is
410/// responsible for launching the rockets at this point.
411///
412/// The other way to do it is to completely manage the `Rocket` instances in
413/// application code (by not returning rocket instances from `init()`).
414///
415/// Allowing _qsu_ to manage the `Rocket` instances will cause _qsu_ to request
416/// graceful shutdown of all `Rocket` instances once a `SvcEvt::Shutdown` is
417/// sent by the runtime.
418///
419/// It is recommended that `ctrlc` shutdown and termination signals are
420/// disabled in each `Rocket` instance's configuration, and allow the _qsu_
421/// runtime to be responsible for initiating the `Rocket` shutdown.
422#[cfg(feature = "rocket")]
423#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))]
424#[async_trait]
425pub trait RocketServiceHandler {
426  type AppErr;
427
428  /// Rocket service initialization.
429  ///
430  /// The returned `Rocket`s will be ignited and their shutdown handlers will
431  /// be triggered on shutdown.
432  async fn init(
433    &mut self,
434    ictx: &mut InitCtx
435  ) -> Result<Vec<rocket::Rocket<rocket::Build>>, Self::AppErr>;
436
437  /// Server application main entry point.
438  ///
439  /// If the `init()` trait method returned `Rocket<Build>` instances to the
440  /// qsu runtime it will ignote these and pass them to `run()`.  It is the
441  /// responsibility of this method to launch the rockets and await them.
442  async fn run(
443    &mut self,
444    rockets: Vec<rocket::Rocket<rocket::Ignite>>,
445    re: &RunEnv
446  ) -> Result<(), Self::AppErr>;
447
448  async fn shutdown(&mut self, tctx: &mut TermCtx)
449  -> Result<(), Self::AppErr>;
450}
451
452
453/// The means through which the service termination happened.
454#[derive(Copy, Clone, Debug)]
455pub enum Demise {
456  /// On unixy platforms, this indicates that the termination was initiated
457  /// via a `SIGINT` signal.  When running as a foreground process on Windows
458  /// this indicates that Ctrl+C was issued.
459  Interrupted,
460
461  /// On unixy platforms, this indicates that the termination was initiated
462  /// via the `SIGTERM` signal.
463  ///
464  /// On Windows, running as a service, this indicates that the service
465  /// subsystem requested service to be shut down. Running as a foreground
466  /// process, this means that Ctrl+Break was issued or that the console
467  /// window was closed.
468  Terminated,
469
470  /// Reached the end of the service application without any external requests
471  /// to terminate.
472  ReachedEnd
473}
474
475#[derive(Copy, Clone, Debug)]
476pub enum UserSig {
477  /// SIGUSR1
478  Sig1,
479
480  /// SIGUSR2
481  Sig2
482}
483
484
485/// Event notifications that originate from the service subsystem that is
486/// controlling the server application.
487#[derive(Copy, Clone, Debug)]
488pub enum SvcEvt {
489  /// User events.
490  ///
491  /// These will be generated on unixy platform if the process receives
492  /// SIGUSR1 or SIGUSR2.
493  User(UserSig),
494
495  /// Service subsystem has requested that the server application should pause
496  /// its operations.
497  ///
498  /// Only the Windows service subsystem will emit these events.
499  Pause,
500
501  /// Service subsystem has requested that the server application should
502  /// resume its operations.
503  ///
504  /// Only the Windows service subsystem will emit these events.
505  Resume,
506
507  /// Service subsystem has requested that the services configuration should
508  /// be reread.
509  ///
510  /// On Unixy platforms this is triggered by SIGHUP, and is unsupported on
511  /// Windows.
512  ReloadConf,
513
514  /// The service application is terminating.  The `Demise` value indicates
515  /// the reason for the shutdown.
516  Shutdown(Demise)
517}
518
519
520/// The server application runtime type.
521// large_enum_variant isn't relevant here because only one instance of this is
522// ever created for a process.
523#[allow(clippy::large_enum_variant, clippy::module_name_repetitions)]
524pub enum SrvAppRt<ApEr> {
525  /// A plain non-async (sometimes referred to as "blocking") server
526  /// application.
527  Sync {
528    svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
529    rt_handler: Box<dyn ServiceHandler<AppErr = ApEr> + Send>
530  },
531
532  /// Initializa a tokio runtime, and call each application handler from an
533  /// async context.
534  #[cfg(feature = "tokio")]
535  #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
536  Tokio {
537    rtbldr: Option<runtime::Builder>,
538    svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
539    rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>
540  },
541
542  /// Allow Rocket to initialize the tokio runtime, and call each application
543  /// handler from an async context.
544  #[cfg(feature = "rocket")]
545  #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))]
546  Rocket {
547    svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
548    rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>
549  }
550}
551
552
553/// Service runner context.
554pub struct RunCtx {
555  service: bool,
556  svcname: String,
557  log_init: bool,
558  passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
559  passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
560  test_mode: bool
561}
562
563impl RunCtx {
564  /// Run as a systemd service.
565  #[cfg(all(target_os = "linux", feature = "systemd"))]
566  fn systemd<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
567  where
568    ApEr: Send + std::fmt::Debug
569  {
570    LumberJack::default()
571      .set_init(self.log_init)
572      .service()
573      .init()?;
574
575    tracing::debug!("Running service '{}'", self.svcname);
576
577    let sr = systemd::ServiceReporter {};
578    let sr = ServiceReporter::new(sr);
579
580    let re = RunEnv::Service(Some(self.svcname.clone()));
581
582    match st {
583      SrvAppRt::Sync {
584        svcevt_handler,
585        rt_handler
586      } => rttype::sync_main(rttype::SyncMainParams {
587        re,
588        svcevt_handler,
589        rt_handler,
590        sr,
591        svcevt_ch: None,
592        passthrough_init: self.passthrough_init,
593        passthrough_term: self.passthrough_term,
594        test_mode: self.test_mode
595      }),
596      SrvAppRt::Tokio {
597        rtbldr,
598        svcevt_handler,
599        rt_handler
600      } => rttype::tokio_main(
601        rtbldr,
602        rttype::TokioMainParams {
603          re,
604          svcevt_handler,
605          rt_handler,
606          sr,
607          svcevt_ch: None,
608          passthrough_init: self.passthrough_init,
609          passthrough_term: self.passthrough_term
610        }
611      ),
612      #[cfg(feature = "rocket")]
613      SrvAppRt::Rocket {
614        svcevt_handler,
615        rt_handler
616      } => rttype::rocket_main(rttype::RocketMainParams {
617        re,
618        svcevt_handler,
619        rt_handler,
620        sr,
621        svcevt_ch: None,
622        passthrough_init: self.passthrough_init,
623        passthrough_term: self.passthrough_term
624      })
625    }
626  }
627
628  /// Run as a Windows service.
629  #[cfg(windows)]
630  fn winsvc<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
631  where
632    ApEr: Send + 'static + std::fmt::Debug
633  {
634    winsvc::run(
635      &self.svcname,
636      st,
637      self.passthrough_init,
638      self.passthrough_term
639    )?;
640
641    Ok(())
642  }
643
644  /// Run as a foreground server
645  fn foreground<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
646  where
647    ApEr: Send + std::fmt::Debug
648  {
649    LumberJack::default().set_init(self.log_init).init()?;
650
651    tracing::debug!("Running service '{}'", self.svcname);
652
653    let sr = nosvc::ServiceReporter {};
654    let sr = ServiceReporter::new(sr);
655
656    match st {
657      SrvAppRt::Sync {
658        svcevt_handler,
659        rt_handler
660      } => rttype::sync_main(rttype::SyncMainParams {
661        re: RunEnv::Foreground,
662        svcevt_handler,
663        rt_handler,
664        sr,
665        svcevt_ch: None,
666        passthrough_init: self.passthrough_init,
667        passthrough_term: self.passthrough_term,
668        test_mode: self.test_mode
669      }),
670
671      #[cfg(feature = "tokio")]
672      SrvAppRt::Tokio {
673        rtbldr,
674        svcevt_handler,
675        rt_handler
676      } => rttype::tokio_main(
677        rtbldr,
678        rttype::TokioMainParams {
679          re: RunEnv::Foreground,
680          svcevt_handler,
681          rt_handler,
682          sr,
683          svcevt_ch: None,
684          passthrough_init: self.passthrough_init,
685          passthrough_term: self.passthrough_term
686        }
687      ),
688
689      #[cfg(feature = "rocket")]
690      SrvAppRt::Rocket {
691        svcevt_handler,
692        rt_handler
693      } => rttype::rocket_main(rttype::RocketMainParams {
694        re: RunEnv::Foreground,
695        svcevt_handler,
696        rt_handler,
697        sr,
698        svcevt_ch: None,
699        passthrough_init: self.passthrough_init,
700        passthrough_term: self.passthrough_term
701      })
702    }
703  }
704}
705
706impl RunCtx {
707  /// Create a new service running context.
708  #[must_use]
709  pub fn new(name: &str) -> Self {
710    Self {
711      service: false,
712      svcname: name.into(),
713      log_init: true,
714      passthrough_init: HashMap::new(),
715      passthrough_term: HashMap::new(),
716      test_mode: false
717    }
718  }
719
720  /// Set an initialization passthrough buffer.
721  ///
722  /// The `data` can be extracted in the service handler's `init()` method by
723  /// calling `InitCtx::take_passthrough()`.
724  #[must_use]
725  pub fn init_passthrough<T>(mut self, data: T) -> Self
726  where
727    T: Send + Sync + 'static
728  {
729    self.init_passthrough_r(data);
730    self
731  }
732
733  /// Same as [`RunCtx::init_passthrough()`], but works with reference to
734  /// `Self`.
735  pub fn init_passthrough_r<T>(&mut self, data: T) -> &mut Self
736  where
737    T: Send + Sync + 'static
738  {
739    self
740      .passthrough_init
741      .insert(TypeId::of::<T>(), Box::new(data));
742    self
743  }
744
745  /// Set an termination passthrough buffer.
746  ///
747  /// The `data` can be extracted in the service handler's `init()` method by
748  /// calling `InitCtx::take_passthrough()`.
749  #[must_use]
750  pub fn term_passthrough<T>(mut self, data: T) -> Self
751  where
752    T: Send + Sync + 'static
753  {
754    self.term_passthrough_r(data);
755    self
756  }
757
758  /// Same as [`RunCtx::term_passthrough()`], but works with reference to
759  /// `Self`.
760  pub fn term_passthrough_r<T>(&mut self, data: T) -> &mut Self
761  where
762    T: Send + Sync + 'static
763  {
764    self
765      .passthrough_term
766      .insert(TypeId::of::<T>(), Box::new(data));
767    self
768  }
769
770  /// Enable test mode.
771  ///
772  /// This method is intended for tests only.
773  ///
774  /// qsu performs a few global initialization that will fail if run repeatedly
775  /// within the same process.  This causes some problem when running tests,
776  /// because rust may run tests in threads within the same process.
777  #[doc(hidden)]
778  #[must_use]
779  pub const fn test_mode(mut self) -> Self {
780    self.log_init = false;
781    self.test_mode = true;
782    self
783  }
784
785  /// Disable logging/tracing initialization.
786  ///
787  /// This is useful in tests because tests may run in different threads within
788  /// the same process, causing the log/tracing initialization to panic.
789  #[doc(hidden)]
790  #[must_use]
791  pub const fn log_init(mut self, flag: bool) -> Self {
792    self.log_init = flag;
793    self
794  }
795
796  /// Reference version of [`RunCtx::log_init()`].
797  #[doc(hidden)]
798  pub const fn log_init_ref(&mut self, flag: bool) -> &mut Self {
799    self.log_init = flag;
800    self
801  }
802
803  /// Mark this run context to run under the operating system's subservice, if
804  /// one is available on this platform.
805  #[must_use]
806  pub const fn service(mut self) -> Self {
807    self.service = true;
808    self
809  }
810
811  /// Mark this run context to run under the operating system's subservice, if
812  /// one is available on this platform.
813  pub const fn service_ref(&mut self) -> &mut Self {
814    self.service = true;
815    self
816  }
817
818  #[must_use]
819  pub const fn is_service(&self) -> bool {
820    self.service
821  }
822
823  /// Launch the application.
824  ///
825  /// If this `RunCtx` has been marked as a _service_ then it will perform the
826  /// appropriate service subsystem integration before running the actual
827  /// server application code.
828  ///
829  /// This function must only be called from the main thread of the process,
830  /// and must be called before any other threads are started.
831  ///
832  /// # Errors
833  /// [`CbErr::App`] is returned, containing application-specific error, if n
834  /// application callback returned an error. [`CbErr::Lib`] indicates that an
835  /// error occurred in the qsu runtime.
836  pub fn run<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
837  where
838    ApEr: Send + 'static + std::fmt::Debug
839  {
840    if self.service {
841      let _ = RUNAS.set(RunAs::SvcSubsys);
842
843      #[cfg(all(target_os = "linux", feature = "systemd"))]
844      self.systemd(st)?;
845
846      #[cfg(windows)]
847      self.winsvc(st)?;
848
849      // ToDo: We should check for other platforms here (like macOS/launchd)
850    } else {
851      let _ = RUNAS.set(RunAs::Foreground);
852
853      // Do not run against any specific service subsystem.  Despite its name
854      // this isn't necessarily running as a foreground process; some service
855      // subsystems do not make a distinction.  Perhaps a better mental model
856      // is that certain service subsystems expects to run regular "foreground"
857      // processes.
858      self.foreground(st)?;
859    }
860
861    Ok(())
862  }
863
864  /// Convenience method around [`Self::run()`] using [`SrvAppRt::Sync`].
865  #[allow(clippy::missing_errors_doc)]
866  pub fn run_sync<ApEr>(
867    self,
868    svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
869    rt_handler: Box<dyn ServiceHandler<AppErr = ApEr> + Send>
870  ) -> Result<(), CbErr<ApEr>>
871  where
872    ApEr: Send + 'static + std::fmt::Debug
873  {
874    self.run(SrvAppRt::Sync {
875      svcevt_handler,
876      rt_handler
877    })
878  }
879
880  /// Convenience method around [`Self::run()`] using [`SrvAppRt::Tokio`].
881  #[cfg(feature = "tokio")]
882  #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
883  #[allow(clippy::missing_errors_doc)]
884  pub fn run_tokio<ApEr>(
885    self,
886    rtbldr: Option<runtime::Builder>,
887    svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
888    rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>
889  ) -> Result<(), CbErr<ApEr>>
890  where
891    ApEr: Send + 'static + std::fmt::Debug
892  {
893    self.run(SrvAppRt::Tokio {
894      rtbldr,
895      svcevt_handler,
896      rt_handler
897    })
898  }
899
900  /// Convenience method around [`Self::run()`] using [`SrvAppRt::Rocket`].
901  #[cfg(feature = "rocket")]
902  #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))]
903  #[allow(clippy::missing_errors_doc)]
904  pub fn run_rocket<ApEr>(
905    self,
906    svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
907    rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>
908  ) -> Result<(), CbErr<ApEr>>
909  where
910    ApEr: Send + 'static + std::fmt::Debug
911  {
912    self.run(SrvAppRt::Rocket {
913      svcevt_handler,
914      rt_handler
915    })
916  }
917}
918
919/// Internal thread used to run service event handler.
920//#[tracing::instrument(name = "svcevtthread", skip_all)]
921fn svcevt_thread(
922  mut rx: broadcast::Receiver<SvcEvt>,
923  mut evt_handler: Box<dyn FnMut(SvcEvt) + Send>
924) {
925  while let Ok(msg) = rx.blocking_recv() {
926    tracing::debug!("Received {:?}", msg);
927
928    #[cfg(all(target_os = "linux", feature = "systemd"))]
929    if matches!(msg, SvcEvt::ReloadConf) {
930      //
931      // Reload has been requested -- report RELOADING=1 and MONOTONIC_USEC to
932      // systemd before calling the application callback.
933      //
934      let ts =
935        nix::time::clock_gettime(nix::time::ClockId::CLOCK_MONOTONIC).unwrap();
936      let s = format!(
937        "RELOADING=1\nMONOTONIC_USEC={}{:06}",
938        ts.tv_sec(),
939        ts.tv_nsec() / 1000
940      );
941      tracing::trace!("Sending notification to systemd: {}", s);
942
943      let custom = NotifyState::Custom(&s);
944      if let Err(e) = sd_notify::notify(&[custom]) {
945        log::error!("Unable to send RELOADING=1 notification to systemd; {e}");
946      }
947    }
948
949    //
950    // Call the application callback
951    //
952    evt_handler(msg);
953
954    #[cfg(all(target_os = "linux", feature = "systemd"))]
955    if matches!(msg, SvcEvt::ReloadConf) {
956      //
957      // This is a reload; report READY=1 to systemd after the application
958      // callback has been called.
959      //
960      tracing::trace!("Sending notification to systemd: READY=1");
961      if let Err(e) = sd_notify::notify(&[NotifyState::Ready]) {
962        log::error!("Unable to send READY=1 notification to systemd; {e}");
963      }
964    }
965
966    // If the event message was either shutdown or terminate, break out of loop
967    // so the thread will terminate
968    if let SvcEvt::Shutdown(_) = msg {
969      tracing::debug!("Terminating thread");
970      // break out of loop when the service shutdown has been rquested
971      break;
972    }
973  }
974}
975
976// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :