qsu/rt.rs
1//! Server application wrapper runtime.
2//!
3//! # Overview
4//! The _qsu_'s runtime lives in a layer between the actual server application
5//! and the "service subsystems" that servier applications can integrate
6//! against:
7//!
8//! <pre>
9//! +---------------------+
10//! | Server Application | (HTTP server, Matrix server, etc)
11//! +---------------------+
12//! | qsu rt |
13//! +---------------------+
14//! | Platform service | (Windows Service subsystem, systemd, etc)
15//! | subsystem |
16//! +---------------------+
17//! </pre>
18//!
19//! The primary goal of the _qsu_ runtime is to provide a consistent interface
20//! that will be mapped to whatever service subsystem it is actually running
21//! on. This including no special subsystem at all; such as when running the
22//! server application as a regular foreground process (which is common during
23//! development).
24//!
25//! # How service applications are implemented and used
26//! 1. Choose service application type.
27//!
28//! _qsu_ needs to know what kind of runtime the server application expects.
29//! Server applications pick the runtime type by implementing a trait, of
30//! which there are currently three recognized types:
31//! - [`ServiceHandler`] is used for "non-async" server applications.
32//! - [`TokioServiceHandler`] is used for server applications that run under
33//! the tokio executor.
34//! - [`RocketServiceHandler`] is for server applications that are built on
35//! top of the Rocket HTTP framework.
36//! 2. Fot the selected service handler type, implement the three methods:
37//! - `init()` for initializing the service.
38//! - `run()` is for running the actual server application.
39//! - `shutdown()` is for shutting down the server application.
40//! 3. Once a service trait has been implemented, the application should create
41//! a [`RunCtx`] object and call its [`run()`](RunCtx::run()) method,
42//! passing in the service implementation object.
43//!
44//! ## Service Handler semantics
45//! When a handler is run through the [`RunCtx`] it will first call the
46//! handler's `init()` method. If it returns `Ok()`, its `run()` method will
47//! be run.
48//!
49//! The handler's `shutdown()` will be called regardless of whether `init()` or
50//! `run()` was successful (the only precondition for `shutdown()` to be called
51//! is that `init()` was called).
52//!
53//! # Passing data to `init()` and `shutdown()`
54//! It's not uncommon that an application may have data that it wishes to pass
55//! to the service handler's `init()` or `shutdown()` handlers. This can be
56//! accomplished by storing the data in the type that implements the service
57//! handler trait. However, if there's a lot of data this can cause it to
58//! become cluttered with things that don't really belong there (they only use
59//! the type as a temporary transport mechanism). As a way to avoid the
60//! clutter, the [`RunCtx`] can be handed types that can be picked up in
61//! `init()`/`shutdown()` (using `InitCtx` and `TermCtx`).
62//!
63//! Use [`RunCtx::init_passthrough()`]/[`RunCtx::init_passthrough_r()`] to pass
64//! data to the `init()` handler, and use
65//! [`RunCtx::term_passthrough()`]/[`RunCtx::term_passthrough_r()`] to pass
66//! data to the `shutdown()` handler.
67//!
68//! # Logging
69//! The runtime will implicitly initialize production logging and development
70//! logging. To see how these can be configured, see
71//! [lumberjack](crate::lumberjack).
72//!
73//! # Argument parser
74//! _qsu_ offers an [argument parser](crate::argp::ArgParser), which can
75//! abstract away much of the runtime management and service registration.
76
77mod nosvc;
78mod rttype;
79mod signals;
80
81#[cfg(all(target_os = "linux", feature = "systemd"))]
82#[cfg_attr(docsrs, doc(cfg(feature = "systemd")))]
83mod systemd;
84
85#[cfg(windows)]
86pub mod winsvc;
87
88use std::{
89 any::{Any, TypeId},
90 sync::{
91 Arc, OnceLock,
92 atomic::{AtomicU32, Ordering}
93 }
94};
95
96use hashbrown::HashMap;
97
98#[cfg(any(feature = "tokio", feature = "rocket"))]
99use async_trait::async_trait;
100
101#[cfg(feature = "tokio")]
102use tokio::runtime;
103
104use tokio::sync::broadcast;
105
106#[cfg(all(target_os = "linux", feature = "systemd"))]
107use sd_notify::NotifyState;
108
109
110use crate::{err::CbErr, lumberjack::LumberJack};
111
112
113#[derive(Copy, Clone, Debug, PartialEq, Eq)]
114pub enum RunAs {
115 Foreground,
116 SvcSubsys
117}
118
119/// Keep track of whether the service application is running as foreground
120/// process or running under a service subsystem.
121static RUNAS: OnceLock<RunAs> = OnceLock::new();
122
123pub fn runas() -> Option<RunAs> {
124 RUNAS.get().copied()
125}
126
127
128/// The run time environment.
129///
130/// Can be used by the application callbacks to determine whether it is running
131/// as a service.
132#[derive(Debug, Clone)]
133pub enum RunEnv {
134 /// Running as a foreground process.
135 Foreground,
136
137 /// Running as a service.
138 ///
139 /// If the service subsystem has named services, this will contain the
140 /// service name.
141 Service(Option<String>)
142}
143
144
145/// Report the current startup/shutdown state to the platform service
146/// subsystem.
147pub(crate) trait StateReporter {
148 /// Inform the service subsystem that initialization is running.
149 fn starting(&self, checkpoint: u32, msg: &str);
150
151 /// Inform the service subsystem that initialization has completed.
152 fn started(&self);
153
154 /// Inform the service subsystem that termination is running.
155 fn stopping(&self, checkpoint: u32, msg: &str);
156
157 /// Inform the service subsystem that termination has completed.
158 fn stopped(&self);
159}
160
161
162/// A wrapper around a service state reporter backend implementation.
163#[derive(Clone)]
164#[repr(transparent)]
165pub(crate) struct ServiceReporter(Arc<dyn StateReporter + Send + Sync>);
166
167impl ServiceReporter {
168 pub(crate) fn new<SR>(sr: SR) -> Self
169 where
170 SR: StateReporter + Send + Sync + 'static
171 {
172 Self(Arc::new(sr))
173 }
174
175 pub(crate) fn starting(&self, checkpoint: u32, status: Option<&str>) {
176 let text = status.as_ref().map_or_else(
177 || format!("Starting[{checkpoint}]"),
178 |msg| format!("Starting[{checkpoint}] {msg}")
179 );
180 self.0.starting(checkpoint, &text);
181 log::debug!("{text}");
182 }
183
184 pub(crate) fn started(&self) {
185 self.0.started();
186 log::debug!(
187 "Service initialization has finished and is entering running state"
188 );
189 }
190
191 pub(crate) fn stopping(&self, checkpoint: u32, status: Option<&str>) {
192 let text = status.as_ref().map_or_else(
193 || format!("Stopping[{checkpoint}]"),
194 |msg| format!("Stopping[{checkpoint}] {msg}")
195 );
196 self.0.stopping(checkpoint, &text);
197 log::debug!("{text}");
198 }
199
200 pub(crate) fn stopped(&self) {
201 self.0.stopped();
202 log::debug!("Service termination has finished");
203 }
204}
205
206
207/// Context passed to `init()` service application callback.
208///
209/// This context can be used to query whether the service application is
210/// running as foreground process or running within a service subsystem.
211///
212/// It can also be used to report progress status back to the service
213/// subsystems, for platforms that support it.
214pub struct InitCtx {
215 re: RunEnv,
216 sr: ServiceReporter,
217 cnt: Arc<AtomicU32>,
218 passthrough: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
219 passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
220}
221
222impl InitCtx {
223 /// Return context used to identify whether service application is running as
224 /// a foreground process or a system service.
225 #[must_use]
226 pub fn runenv(&self) -> RunEnv {
227 self.re.clone()
228 }
229
230 /// Report startup state to the system service manager.
231 ///
232 /// For foreground processes and services that do not support startup state
233 /// notifications this method has no effect.
234 pub fn report(&self, status: Option<&str>) {
235 let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
236 if let Some(msg) = status {
237 tracing::trace!("Reached init checkpoint {checkpoint}; {}", msg);
238 } else {
239 tracing::trace!("Reached init checkpoint {checkpoint}");
240 }
241 self.sr.starting(checkpoint, status);
242 }
243
244 #[must_use]
245 pub fn get<T>(&self) -> Option<&T>
246 where
247 T: Send + 'static
248 {
249 self
250 .passthrough
251 .get(&TypeId::of::<T>())
252 .and_then(|boxed| boxed.downcast_ref::<T>())
253 }
254
255 /// Extract passthrough buffer.
256 pub fn take<T>(&mut self) -> Option<T>
257 where
258 T: Send + 'static
259 {
260 self
261 .passthrough
262 .remove(&TypeId::of::<T>())
263 .and_then(|boxed| boxed.downcast::<T>().ok())
264 .map(|v| *v)
265 }
266
267 pub fn term_passthrough<T>(&mut self, value: T)
268 where
269 T: Send + Sync + 'static
270 {
271 self
272 .passthrough_term
273 .insert(TypeId::of::<T>(), Box::new(value));
274 }
275}
276
277impl Drop for InitCtx {
278 fn drop(&mut self) {
279 let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
280 self
281 .sr
282 .starting(checkpoint, Some("Initialization phase finished"));
283 }
284}
285
286
287/// Context passed to `term()` service application callback.
288///
289/// This context can be used to query whether the service application is
290/// running as foreground process or running within a service subsystem.
291///
292/// It can also be used to report progress status back to the service
293/// subsystems, for platforms that support it.
294pub struct TermCtx {
295 re: RunEnv,
296 sr: ServiceReporter,
297 cnt: Arc<AtomicU32>,
298 passthrough: HashMap<TypeId, Box<dyn Any + Send + Sync>>
299}
300
301impl TermCtx {
302 /// Return context used to identify whether service application is running as
303 /// a foreground process or a system service.
304 #[must_use]
305 pub fn runenv(&self) -> RunEnv {
306 self.re.clone()
307 }
308
309 /// Report shutdown state to the system service manager.
310 ///
311 /// For foreground processes and services that do not support shutdown state
312 /// notifications this method has no effect.
313 pub fn report(&self, status: Option<&str>) {
314 let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
315 if let Some(msg) = status {
316 tracing::trace!("Reached term checkpoint {checkpoint}; {msg}");
317 } else {
318 tracing::trace!("Reached term checkpoint {checkpoint}");
319 }
320 self.sr.stopping(checkpoint, status);
321 }
322
323 #[must_use]
324 pub fn get<T: 'static>(&self) -> Option<&T> {
325 self
326 .passthrough
327 .get(&TypeId::of::<T>())
328 .and_then(|boxed| boxed.downcast_ref::<T>())
329 }
330
331 /// Extract passthrough buffer.
332 pub fn take<T: 'static>(&mut self) -> Option<T> {
333 self
334 .passthrough
335 .remove(&TypeId::of::<T>())
336 .and_then(|boxed| boxed.downcast::<T>().ok())
337 .map(|v| *v)
338 }
339}
340
341impl Drop for TermCtx {
342 fn drop(&mut self) {
343 let checkpoint = self.cnt.fetch_add(1, Ordering::SeqCst);
344 self
345 .sr
346 .stopping(checkpoint, Some("Termination phase finished"));
347 }
348}
349
350
351/// "Synchronous" (non-`async`) server application.
352///
353/// Implement this for an object that wraps a server application that does not
354/// use an async runtime.
355pub trait ServiceHandler {
356 type AppErr;
357
358 /// Implement to handle service application initialization.
359 ///
360 /// # Errors
361 /// Application-defined error returned from callback will be wrapped in
362 /// [`CbErr::App`] and returned to application.
363 fn init(&mut self, ictx: &mut InitCtx) -> Result<(), Self::AppErr>;
364
365 /// Implement to run service application.
366 ///
367 /// # Errors
368 /// Application-defined error returned from callback will be wrapped in
369 /// [`CbErr::App`] and returned to application.
370 fn run(&mut self, re: &RunEnv) -> Result<(), Self::AppErr>;
371
372 /// Implement to handle service application termination.
373 ///
374 /// # Errors
375 /// Application-defined error returned from callback will be wrapped in
376 /// [`CbErr::App`] and returned to application.
377 fn shutdown(&mut self, tctx: &mut TermCtx) -> Result<(), Self::AppErr>;
378}
379
380
381/// `async` server application built on the tokio runtime.
382///
383/// Implement this for an object that wraps a server application that uses
384/// tokio as an async runtime.
385#[cfg(feature = "tokio")]
386#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
387#[async_trait]
388pub trait TokioServiceHandler {
389 type AppErr;
390
391 async fn init(&mut self, ictx: &mut InitCtx) -> Result<(), Self::AppErr>;
392
393 async fn run(&mut self, re: &RunEnv) -> Result<(), Self::AppErr>;
394
395 async fn shutdown(&mut self, tctx: &mut TermCtx)
396 -> Result<(), Self::AppErr>;
397}
398
399
400/// Rocket server application handler.
401///
402/// While Rocket is built on top of tokio, it \[Rocket\] wants to initialize
403/// tokio itself.
404///
405/// There are two major ways to write Rocket services using qsu; either the
406/// application can let qsu be aware of the server applications' `Rocket`
407/// instances. It does this by creating the `Rocket` instances in
408/// `RocketServiceHandler::init()` and returns them. _qsu_ will ignite these
409/// rockets and pass them to `RocketServiceHandler::run()`. The application is
410/// responsible for launching the rockets at this point.
411///
412/// The other way to do it is to completely manage the `Rocket` instances in
413/// application code (by not returning rocket instances from `init()`).
414///
415/// Allowing _qsu_ to manage the `Rocket` instances will cause _qsu_ to request
416/// graceful shutdown of all `Rocket` instances once a `SvcEvt::Shutdown` is
417/// sent by the runtime.
418///
419/// It is recommended that `ctrlc` shutdown and termination signals are
420/// disabled in each `Rocket` instance's configuration, and allow the _qsu_
421/// runtime to be responsible for initiating the `Rocket` shutdown.
422#[cfg(feature = "rocket")]
423#[cfg_attr(docsrs, doc(cfg(feature = "rocket")))]
424#[async_trait]
425pub trait RocketServiceHandler {
426 type AppErr;
427
428 /// Rocket service initialization.
429 ///
430 /// The returned `Rocket`s will be ignited and their shutdown handlers will
431 /// be triggered on shutdown.
432 async fn init(
433 &mut self,
434 ictx: &mut InitCtx
435 ) -> Result<Vec<rocket::Rocket<rocket::Build>>, Self::AppErr>;
436
437 /// Server application main entry point.
438 ///
439 /// If the `init()` trait method returned `Rocket<Build>` instances to the
440 /// qsu runtime it will ignote these and pass them to `run()`. It is the
441 /// responsibility of this method to launch the rockets and await them.
442 async fn run(
443 &mut self,
444 rockets: Vec<rocket::Rocket<rocket::Ignite>>,
445 re: &RunEnv
446 ) -> Result<(), Self::AppErr>;
447
448 async fn shutdown(&mut self, tctx: &mut TermCtx)
449 -> Result<(), Self::AppErr>;
450}
451
452
453/// The means through which the service termination happened.
454#[derive(Copy, Clone, Debug)]
455pub enum Demise {
456 /// On unixy platforms, this indicates that the termination was initiated
457 /// via a `SIGINT` signal. When running as a foreground process on Windows
458 /// this indicates that Ctrl+C was issued.
459 Interrupted,
460
461 /// On unixy platforms, this indicates that the termination was initiated
462 /// via the `SIGTERM` signal.
463 ///
464 /// On Windows, running as a service, this indicates that the service
465 /// subsystem requested service to be shut down. Running as a foreground
466 /// process, this means that Ctrl+Break was issued or that the console
467 /// window was closed.
468 Terminated,
469
470 /// Reached the end of the service application without any external requests
471 /// to terminate.
472 ReachedEnd
473}
474
475#[derive(Copy, Clone, Debug)]
476pub enum UserSig {
477 /// SIGUSR1
478 Sig1,
479
480 /// SIGUSR2
481 Sig2
482}
483
484
485/// Event notifications that originate from the service subsystem that is
486/// controlling the server application.
487#[derive(Copy, Clone, Debug)]
488pub enum SvcEvt {
489 /// User events.
490 ///
491 /// These will be generated on unixy platform if the process receives
492 /// SIGUSR1 or SIGUSR2.
493 User(UserSig),
494
495 /// Service subsystem has requested that the server application should pause
496 /// its operations.
497 ///
498 /// Only the Windows service subsystem will emit these events.
499 Pause,
500
501 /// Service subsystem has requested that the server application should
502 /// resume its operations.
503 ///
504 /// Only the Windows service subsystem will emit these events.
505 Resume,
506
507 /// Service subsystem has requested that the services configuration should
508 /// be reread.
509 ///
510 /// On Unixy platforms this is triggered by SIGHUP, and is unsupported on
511 /// Windows.
512 ReloadConf,
513
514 /// The service application is terminating. The `Demise` value indicates
515 /// the reason for the shutdown.
516 Shutdown(Demise)
517}
518
519
520/// The server application runtime type.
521// large_enum_variant isn't relevant here because only one instance of this is
522// ever created for a process.
523#[allow(clippy::large_enum_variant, clippy::module_name_repetitions)]
524pub enum SrvAppRt<ApEr> {
525 /// A plain non-async (sometimes referred to as "blocking") server
526 /// application.
527 Sync {
528 svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
529 rt_handler: Box<dyn ServiceHandler<AppErr = ApEr> + Send>
530 },
531
532 /// Initializa a tokio runtime, and call each application handler from an
533 /// async context.
534 #[cfg(feature = "tokio")]
535 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
536 Tokio {
537 rtbldr: Option<runtime::Builder>,
538 svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
539 rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>
540 },
541
542 /// Allow Rocket to initialize the tokio runtime, and call each application
543 /// handler from an async context.
544 #[cfg(feature = "rocket")]
545 #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))]
546 Rocket {
547 svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
548 rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>
549 }
550}
551
552
553/// Service runner context.
554pub struct RunCtx {
555 service: bool,
556 svcname: String,
557 log_init: bool,
558 passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
559 passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
560 test_mode: bool
561}
562
563impl RunCtx {
564 /// Run as a systemd service.
565 #[cfg(all(target_os = "linux", feature = "systemd"))]
566 fn systemd<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
567 where
568 ApEr: Send + std::fmt::Debug
569 {
570 LumberJack::default()
571 .set_init(self.log_init)
572 .service()
573 .init()?;
574
575 tracing::debug!("Running service '{}'", self.svcname);
576
577 let sr = systemd::ServiceReporter {};
578 let sr = ServiceReporter::new(sr);
579
580 let re = RunEnv::Service(Some(self.svcname.clone()));
581
582 match st {
583 SrvAppRt::Sync {
584 svcevt_handler,
585 rt_handler
586 } => rttype::sync_main(rttype::SyncMainParams {
587 re,
588 svcevt_handler,
589 rt_handler,
590 sr,
591 svcevt_ch: None,
592 passthrough_init: self.passthrough_init,
593 passthrough_term: self.passthrough_term,
594 test_mode: self.test_mode
595 }),
596 SrvAppRt::Tokio {
597 rtbldr,
598 svcevt_handler,
599 rt_handler
600 } => rttype::tokio_main(
601 rtbldr,
602 rttype::TokioMainParams {
603 re,
604 svcevt_handler,
605 rt_handler,
606 sr,
607 svcevt_ch: None,
608 passthrough_init: self.passthrough_init,
609 passthrough_term: self.passthrough_term
610 }
611 ),
612 #[cfg(feature = "rocket")]
613 SrvAppRt::Rocket {
614 svcevt_handler,
615 rt_handler
616 } => rttype::rocket_main(rttype::RocketMainParams {
617 re,
618 svcevt_handler,
619 rt_handler,
620 sr,
621 svcevt_ch: None,
622 passthrough_init: self.passthrough_init,
623 passthrough_term: self.passthrough_term
624 })
625 }
626 }
627
628 /// Run as a Windows service.
629 #[cfg(windows)]
630 fn winsvc<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
631 where
632 ApEr: Send + 'static + std::fmt::Debug
633 {
634 winsvc::run(
635 &self.svcname,
636 st,
637 self.passthrough_init,
638 self.passthrough_term
639 )?;
640
641 Ok(())
642 }
643
644 /// Run as a foreground server
645 fn foreground<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
646 where
647 ApEr: Send + std::fmt::Debug
648 {
649 LumberJack::default().set_init(self.log_init).init()?;
650
651 tracing::debug!("Running service '{}'", self.svcname);
652
653 let sr = nosvc::ServiceReporter {};
654 let sr = ServiceReporter::new(sr);
655
656 match st {
657 SrvAppRt::Sync {
658 svcevt_handler,
659 rt_handler
660 } => rttype::sync_main(rttype::SyncMainParams {
661 re: RunEnv::Foreground,
662 svcevt_handler,
663 rt_handler,
664 sr,
665 svcevt_ch: None,
666 passthrough_init: self.passthrough_init,
667 passthrough_term: self.passthrough_term,
668 test_mode: self.test_mode
669 }),
670
671 #[cfg(feature = "tokio")]
672 SrvAppRt::Tokio {
673 rtbldr,
674 svcevt_handler,
675 rt_handler
676 } => rttype::tokio_main(
677 rtbldr,
678 rttype::TokioMainParams {
679 re: RunEnv::Foreground,
680 svcevt_handler,
681 rt_handler,
682 sr,
683 svcevt_ch: None,
684 passthrough_init: self.passthrough_init,
685 passthrough_term: self.passthrough_term
686 }
687 ),
688
689 #[cfg(feature = "rocket")]
690 SrvAppRt::Rocket {
691 svcevt_handler,
692 rt_handler
693 } => rttype::rocket_main(rttype::RocketMainParams {
694 re: RunEnv::Foreground,
695 svcevt_handler,
696 rt_handler,
697 sr,
698 svcevt_ch: None,
699 passthrough_init: self.passthrough_init,
700 passthrough_term: self.passthrough_term
701 })
702 }
703 }
704}
705
706impl RunCtx {
707 /// Create a new service running context.
708 #[must_use]
709 pub fn new(name: &str) -> Self {
710 Self {
711 service: false,
712 svcname: name.into(),
713 log_init: true,
714 passthrough_init: HashMap::new(),
715 passthrough_term: HashMap::new(),
716 test_mode: false
717 }
718 }
719
720 /// Set an initialization passthrough buffer.
721 ///
722 /// The `data` can be extracted in the service handler's `init()` method by
723 /// calling `InitCtx::take_passthrough()`.
724 #[must_use]
725 pub fn init_passthrough<T>(mut self, data: T) -> Self
726 where
727 T: Send + Sync + 'static
728 {
729 self.init_passthrough_r(data);
730 self
731 }
732
733 /// Same as [`RunCtx::init_passthrough()`], but works with reference to
734 /// `Self`.
735 pub fn init_passthrough_r<T>(&mut self, data: T) -> &mut Self
736 where
737 T: Send + Sync + 'static
738 {
739 self
740 .passthrough_init
741 .insert(TypeId::of::<T>(), Box::new(data));
742 self
743 }
744
745 /// Set an termination passthrough buffer.
746 ///
747 /// The `data` can be extracted in the service handler's `init()` method by
748 /// calling `InitCtx::take_passthrough()`.
749 #[must_use]
750 pub fn term_passthrough<T>(mut self, data: T) -> Self
751 where
752 T: Send + Sync + 'static
753 {
754 self.term_passthrough_r(data);
755 self
756 }
757
758 /// Same as [`RunCtx::term_passthrough()`], but works with reference to
759 /// `Self`.
760 pub fn term_passthrough_r<T>(&mut self, data: T) -> &mut Self
761 where
762 T: Send + Sync + 'static
763 {
764 self
765 .passthrough_term
766 .insert(TypeId::of::<T>(), Box::new(data));
767 self
768 }
769
770 /// Enable test mode.
771 ///
772 /// This method is intended for tests only.
773 ///
774 /// qsu performs a few global initialization that will fail if run repeatedly
775 /// within the same process. This causes some problem when running tests,
776 /// because rust may run tests in threads within the same process.
777 #[doc(hidden)]
778 #[must_use]
779 pub const fn test_mode(mut self) -> Self {
780 self.log_init = false;
781 self.test_mode = true;
782 self
783 }
784
785 /// Disable logging/tracing initialization.
786 ///
787 /// This is useful in tests because tests may run in different threads within
788 /// the same process, causing the log/tracing initialization to panic.
789 #[doc(hidden)]
790 #[must_use]
791 pub const fn log_init(mut self, flag: bool) -> Self {
792 self.log_init = flag;
793 self
794 }
795
796 /// Reference version of [`RunCtx::log_init()`].
797 #[doc(hidden)]
798 pub const fn log_init_ref(&mut self, flag: bool) -> &mut Self {
799 self.log_init = flag;
800 self
801 }
802
803 /// Mark this run context to run under the operating system's subservice, if
804 /// one is available on this platform.
805 #[must_use]
806 pub const fn service(mut self) -> Self {
807 self.service = true;
808 self
809 }
810
811 /// Mark this run context to run under the operating system's subservice, if
812 /// one is available on this platform.
813 pub const fn service_ref(&mut self) -> &mut Self {
814 self.service = true;
815 self
816 }
817
818 #[must_use]
819 pub const fn is_service(&self) -> bool {
820 self.service
821 }
822
823 /// Launch the application.
824 ///
825 /// If this `RunCtx` has been marked as a _service_ then it will perform the
826 /// appropriate service subsystem integration before running the actual
827 /// server application code.
828 ///
829 /// This function must only be called from the main thread of the process,
830 /// and must be called before any other threads are started.
831 ///
832 /// # Errors
833 /// [`CbErr::App`] is returned, containing application-specific error, if n
834 /// application callback returned an error. [`CbErr::Lib`] indicates that an
835 /// error occurred in the qsu runtime.
836 pub fn run<ApEr>(self, st: SrvAppRt<ApEr>) -> Result<(), CbErr<ApEr>>
837 where
838 ApEr: Send + 'static + std::fmt::Debug
839 {
840 if self.service {
841 let _ = RUNAS.set(RunAs::SvcSubsys);
842
843 #[cfg(all(target_os = "linux", feature = "systemd"))]
844 self.systemd(st)?;
845
846 #[cfg(windows)]
847 self.winsvc(st)?;
848
849 // ToDo: We should check for other platforms here (like macOS/launchd)
850 } else {
851 let _ = RUNAS.set(RunAs::Foreground);
852
853 // Do not run against any specific service subsystem. Despite its name
854 // this isn't necessarily running as a foreground process; some service
855 // subsystems do not make a distinction. Perhaps a better mental model
856 // is that certain service subsystems expects to run regular "foreground"
857 // processes.
858 self.foreground(st)?;
859 }
860
861 Ok(())
862 }
863
864 /// Convenience method around [`Self::run()`] using [`SrvAppRt::Sync`].
865 #[allow(clippy::missing_errors_doc)]
866 pub fn run_sync<ApEr>(
867 self,
868 svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
869 rt_handler: Box<dyn ServiceHandler<AppErr = ApEr> + Send>
870 ) -> Result<(), CbErr<ApEr>>
871 where
872 ApEr: Send + 'static + std::fmt::Debug
873 {
874 self.run(SrvAppRt::Sync {
875 svcevt_handler,
876 rt_handler
877 })
878 }
879
880 /// Convenience method around [`Self::run()`] using [`SrvAppRt::Tokio`].
881 #[cfg(feature = "tokio")]
882 #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))]
883 #[allow(clippy::missing_errors_doc)]
884 pub fn run_tokio<ApEr>(
885 self,
886 rtbldr: Option<runtime::Builder>,
887 svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
888 rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>
889 ) -> Result<(), CbErr<ApEr>>
890 where
891 ApEr: Send + 'static + std::fmt::Debug
892 {
893 self.run(SrvAppRt::Tokio {
894 rtbldr,
895 svcevt_handler,
896 rt_handler
897 })
898 }
899
900 /// Convenience method around [`Self::run()`] using [`SrvAppRt::Rocket`].
901 #[cfg(feature = "rocket")]
902 #[cfg_attr(docsrs, doc(cfg(feature = "rocket")))]
903 #[allow(clippy::missing_errors_doc)]
904 pub fn run_rocket<ApEr>(
905 self,
906 svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
907 rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>
908 ) -> Result<(), CbErr<ApEr>>
909 where
910 ApEr: Send + 'static + std::fmt::Debug
911 {
912 self.run(SrvAppRt::Rocket {
913 svcevt_handler,
914 rt_handler
915 })
916 }
917}
918
919/// Internal thread used to run service event handler.
920//#[tracing::instrument(name = "svcevtthread", skip_all)]
921fn svcevt_thread(
922 mut rx: broadcast::Receiver<SvcEvt>,
923 mut evt_handler: Box<dyn FnMut(SvcEvt) + Send>
924) {
925 while let Ok(msg) = rx.blocking_recv() {
926 tracing::debug!("Received {:?}", msg);
927
928 #[cfg(all(target_os = "linux", feature = "systemd"))]
929 if matches!(msg, SvcEvt::ReloadConf) {
930 //
931 // Reload has been requested -- report RELOADING=1 and MONOTONIC_USEC to
932 // systemd before calling the application callback.
933 //
934 let ts =
935 nix::time::clock_gettime(nix::time::ClockId::CLOCK_MONOTONIC).unwrap();
936 let s = format!(
937 "RELOADING=1\nMONOTONIC_USEC={}{:06}",
938 ts.tv_sec(),
939 ts.tv_nsec() / 1000
940 );
941 tracing::trace!("Sending notification to systemd: {}", s);
942
943 let custom = NotifyState::Custom(&s);
944 if let Err(e) = sd_notify::notify(&[custom]) {
945 log::error!("Unable to send RELOADING=1 notification to systemd; {e}");
946 }
947 }
948
949 //
950 // Call the application callback
951 //
952 evt_handler(msg);
953
954 #[cfg(all(target_os = "linux", feature = "systemd"))]
955 if matches!(msg, SvcEvt::ReloadConf) {
956 //
957 // This is a reload; report READY=1 to systemd after the application
958 // callback has been called.
959 //
960 tracing::trace!("Sending notification to systemd: READY=1");
961 if let Err(e) = sd_notify::notify(&[NotifyState::Ready]) {
962 log::error!("Unable to send READY=1 notification to systemd; {e}");
963 }
964 }
965
966 // If the event message was either shutdown or terminate, break out of loop
967 // so the thread will terminate
968 if let SvcEvt::Shutdown(_) = msg {
969 tracing::debug!("Terminating thread");
970 // break out of loop when the service shutdown has been rquested
971 break;
972 }
973 }
974}
975
976// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :