Skip to main content

qsu/rt/rttype/
rocket.rs

1//! Rocket runtime module.
2//!
3//! While Rocket uses tokio, it [Rocket] insists on initializing tokio for
4//! itself.  Attempting to use the `TokioServiceHandler` will cause `Rocket`s
5//! to issue a warning at startup.
6//!
7//! As a convenience _qsu_ can keep track of rockets and automatically shut
8//! them down once the service subsystem requests a shutdown.  To use this
9//! feature, the server application should return a `Vec<Rocket<Build>>` from
10//! `RocketServiceHandler::init()`.  Any `Rocket` instance in this vec will be
11//! ignited before being passed to `RocketServiceHandler::run()`.
12//!
13//! Server applications do not need to use this feature and should return an
14//! empty vector from `init()` in this case.  This also requires the
15//! application code to trigger a shutdown of each instance itself.
16
17use std::{
18  any::{Any, TypeId},
19  sync::{Arc, atomic::AtomicU32},
20  thread
21};
22
23use hashbrown::HashMap;
24
25use tokio::{sync::broadcast, task};
26
27use killswitch::KillSwitch;
28
29use crate::{
30  err::{AppErrors, CbErr},
31  rt::{
32    Demise, InitCtx, RocketServiceHandler, RunEnv, ServiceReporter, SvcEvt,
33    TermCtx, signals
34  }
35};
36
37#[cfg(unix)]
38use crate::rt::UserSig;
39
40
41pub struct MainParams<ApEr>
42where
43  ApEr: Send
44{
45  pub(crate) re: RunEnv,
46  pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
47  pub(crate) rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>,
48  pub(crate) sr: ServiceReporter,
49  pub(crate) svcevt_ch:
50    Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
51  pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
52  pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
53}
54
55/// Internal `main()`-like routine for server applications that run one or more
56/// Rockets as their main application.
57pub fn main<ApEr>(params: MainParams<ApEr>) -> Result<(), CbErr<ApEr>>
58where
59  ApEr: Send
60{
61  rocket::execute(rocket_async_main(params))?;
62
63  Ok(())
64}
65
66
67#[expect(clippy::too_many_lines)]
68async fn rocket_async_main<ApEr>(
69  MainParams {
70    re,
71    svcevt_handler,
72    mut rt_handler,
73    sr,
74    svcevt_ch,
75    passthrough_init,
76    mut passthrough_term
77  }: MainParams<ApEr>
78) -> Result<(), CbErr<ApEr>>
79where
80  ApEr: Send
81{
82  let ks = KillSwitch::new();
83
84  let start_ts = jiff::Timestamp::now();
85
86  // If a SvcEvt receiver end-point was handed to us, then use it.  The
87  // presumption is that there's a service subsystem somewhere that has
88  // created the channel and is holding one of the end-points.
89  //
90  // Otherwise create our own channel and spawn the monitoring tasks that will
91  // generate events for it.
92  let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
93  {
94    (tx_svcevt, rx_svcevt)
95  } else {
96    init_svc_channels(&ks)
97  };
98
99  // Special implicit checkpoint 0
100  sr.starting(0, Some(super::SVCAPP_INIT_MSG));
101
102  // Call application's init() method.
103  let mut ictx = InitCtx {
104    re: re.clone(),
105    sr: sr.clone(),
106    cnt: Arc::new(AtomicU32::new(1)), // 0 used, start at 1
107    passthrough: passthrough_init,
108    passthrough_term: HashMap::new()
109  };
110  let (rockets, init_apperr) =
111    super::timeit_async("initialization", || async {
112      match rt_handler.init(&mut ictx).await {
113        Ok(rockets) => (rockets, None),
114        Err(e) => (Vec::new(), Some(e))
115      }
116    })
117    .await;
118
119  // The application init may have added termctx passthrough data.  Transfer
120  // them to the passthrough term map.
121  for (k, v) in ictx.passthrough_term.drain() {
122    passthrough_term.insert(k, v);
123  }
124
125  // Drop InitCtx.
126  //
127  // This will trigger report to the service subsystem that the init phase is
128  // done.
129  drop(ictx);
130
131  // Ignite rockets so we can get Shutdown contexts for each of the instances.
132  // There are two cases where the rockets vector will be empty:
133  // - if init() returned error.
134  // - the application doesn't want to use the built-in rocket shutdown
135  //   support.
136  let mut ignited = vec![];
137  let mut rocket_shutdowns = vec![];
138  for rocket in rockets {
139    let rocket = rocket.ignite().await?;
140    rocket_shutdowns.push(rocket.shutdown());
141    ignited.push(rocket);
142  }
143
144  // If init() was successful, then do some prepartations and then run the
145  // application run() callback.
146  let run_apperr = if init_apperr.is_none() {
147    // Set the service's state to "started"
148    sr.started();
149
150    let mut rx_svcevt2 = rx_svcevt.resubscribe();
151
152    // Launch a task that waits for the SvtEvt::Shutdown event.   Once it
153    // arrives, tell all rocket instances to gracefully shutdown.
154    //
155    // Note: We don't want to use the killswitch for this because the
156    // killswitch isn't triggered until run() has returned.
157    let jh_graceful_landing = task::spawn(async move {
158      loop {
159        let msg = match rx_svcevt2.recv().await {
160          Ok(msg) => msg,
161          Err(e) => {
162            log::error!("Unable to receive broadcast SvcEvt message, {e}");
163            break;
164          }
165        };
166        if let SvcEvt::Shutdown(_) = msg {
167          tracing::trace!("Ask rocket instances to shut down gracefully");
168          for shutdown in rocket_shutdowns {
169            // Tell this rocket instance to shut down gracefully.
170            shutdown.notify();
171          }
172          break;
173        }
174        tracing::trace!("Ignored message in task waiting for shutdown");
175      }
176    });
177
178    // Kick off service event monitoring thread before running main app
179    // callback
180    let jh = thread::Builder::new()
181      .name("svcevt".into())
182      .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
183      .unwrap();
184
185    // Run the main service application callback.
186    //
187    // This is basically the service application's "main()".
188    let res = super::timeit_async("main", || async {
189      rt_handler.run(ignited, &re).await.err()
190    })
191    .await;
192
193    // Shut down svcevent thread
194    //
195    // Tell it that an (implicit) shutdown event has occurred.
196    // Duplicates don't matter, because once the first one is processed the
197    // thread will terminate.
198    let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
199
200    // Wait for all task that is waiting for a shutdown event to complete
201    if let Err(e) = jh_graceful_landing.await {
202      log::warn!("An error was returned from the graceful landing task; {e}");
203    }
204
205    // Wait for service event monitoring thread to terminate
206    let _ = task::spawn_blocking(|| jh.join()).await;
207
208    res
209  } else {
210    None
211  };
212
213  // Special implicit checkpoint 0
214  //
215  // Always send the first shutdown checkpoint here.  Either init() failed or
216  // run retuned.  Either way, we're shutting down.
217  sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
218
219  // Now that the main application has terminated kill off any remaining
220  // auxiliary tasks (read: signal waiters)
221  ks.trigger();
222  if (ks.finalize().await).is_err() {
223    log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
224  }
225
226  // Call the application's shutdown() function.
227  let mut tctx = TermCtx {
228    re,
229    sr: sr.clone(),
230    cnt: Arc::new(AtomicU32::new(1)), // 0 used above, so start at 1
231    passthrough: passthrough_term
232  };
233  let term_apperr = super::timeit_async("termination", || async {
234    rt_handler.shutdown(&mut tctx).await.err()
235  })
236  .await;
237
238  // Drop TermCtx.
239  //
240  // This will trigger report to the service subsystem that the term phase is
241  // done.
242  drop(tctx);
243
244  // Inform the service subsystem that the the shutdown is complete
245  sr.stopped();
246
247  log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
248
249  // There can be multiple failures, and we don't want to lose information
250  // about what went wrong, so return an error context that can contain all
251  // callback errors.
252  if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
253    let apperrs = AppErrors {
254      init: init_apperr,
255      run: run_apperr,
256      shutdown: term_apperr
257    };
258    Err(CbErr::SrvApp(apperrs))?;
259  }
260
261  Ok(())
262}
263
264fn init_svc_channels(
265  ks: &KillSwitch
266) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
267  // Create channel used to signal events to application
268  let (tx, rx) = broadcast::channel(16);
269
270  // ToDo: autoclone
271  let ks2 = ks.clone();
272
273  // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event.
274  // ToDo: autoclone
275  let txc = tx.clone();
276  task::spawn(signals::wait_shutdown(
277    move || {
278      if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
279        log::error!("Unable to send SvcEvt::Shutdown event; {e}");
280      }
281    },
282    ks2
283  ));
284
285  // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a
286  // Terminate event.
287  // ToDo: autoclone
288  let txc = tx.clone();
289  // ToDo: autoclone
290  let ks2 = ks.clone();
291  task::spawn(signals::wait_term(
292    move || {
293      if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Terminated)) {
294        log::error!("Unable to send SvcEvt::Terminate event; {e}");
295      }
296    },
297    ks2
298  ));
299
300  // There doesn't seem to be anything equivalent to SIGHUP for Windows
301  // (Services)
302  #[cfg(unix)]
303  {
304    // ToDo: autoclone
305    let ks2 = ks.clone();
306
307    // ToDo: autoclone
308    let txc = tx.clone();
309    task::spawn(signals::wait_reload(
310      move || {
311        if let Err(e) = txc.send(SvcEvt::ReloadConf) {
312          log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
313        }
314      },
315      ks2
316    ));
317  }
318
319  #[cfg(unix)]
320  {
321    let ks2 = ks.clone();
322
323    let txc = tx.clone();
324    task::spawn(signals::wait_user1(
325      move || {
326        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
327          log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
328        }
329      },
330      ks2
331    ));
332  }
333
334  #[cfg(unix)]
335  {
336    let ks2 = ks.clone();
337
338    let txc = tx.clone();
339    task::spawn(signals::wait_user2(
340      move || {
341        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
342          log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
343        }
344      },
345      ks2
346    ));
347  }
348
349  (tx, rx)
350}
351
352// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :