Skip to main content

qsu/rt/rttype/
tokio.rs

1use std::{
2  any::{Any, TypeId},
3  sync::{Arc, atomic::AtomicU32},
4  thread
5};
6
7use hashbrown::HashMap;
8
9use tokio::{runtime, sync::broadcast, task};
10
11use crate::{
12  err::{AppErrors, CbErr},
13  rt::{
14    Demise, InitCtx, RunEnv, ServiceReporter, SvcEvt, TermCtx,
15    TokioServiceHandler, signals
16  }
17};
18
19use killswitch::KillSwitch;
20
21#[cfg(unix)]
22use crate::rt::UserSig;
23
24
25pub struct MainParams<ApEr> {
26  pub(crate) re: RunEnv,
27  pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
28  pub(crate) rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>,
29  pub(crate) sr: ServiceReporter,
30  pub(crate) svcevt_ch:
31    Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
32  pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
33  pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
34}
35
36
37/// Internal `main()`-like routine for server applications that run the tokio
38/// runtime for `async` code.
39pub fn main<ApEr>(
40  rtbldr: Option<runtime::Builder>,
41  params: MainParams<ApEr>
42) -> Result<(), CbErr<ApEr>>
43where
44  ApEr: Send + std::fmt::Debug
45{
46  let rt = if let Some(mut bldr) = rtbldr {
47    bldr.build()?
48  } else {
49    tokio::runtime::Runtime::new()?
50  };
51  rt.block_on(async_main(params))?;
52
53  Ok(())
54}
55
56/// The `async` main function for tokio servers.
57///
58/// If `rx_svcevt` is `Some(_)` it means the channel was created elsewhere
59/// (implied: The transmitting endpoint lives somewhere else).  If it is `None`
60/// the channel needs to be created.
61async fn async_main<ApEr>(
62  MainParams {
63    re,
64    svcevt_handler,
65    mut rt_handler,
66    sr,
67    svcevt_ch,
68    passthrough_init,
69    mut passthrough_term
70  }: MainParams<ApEr>
71) -> Result<(), CbErr<ApEr>>
72where
73  ApEr: Send + std::fmt::Debug
74{
75  let ks = KillSwitch::new();
76
77  let start_ts = jiff::Timestamp::now();
78
79  // If a SvcEvt receiver end-point was handed to us, then use it.  Otherwise
80  // create our own and spawn the monitoring tasks that will generate events
81  // for it.
82  let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
83  {
84    (tx_svcevt, rx_svcevt)
85  } else {
86    init_svc_channels(&ks)
87  };
88
89  // Special implicit checkpoint 0
90  sr.starting(0, Some(super::SVCAPP_INIT_MSG));
91
92  // Call application's init() method.
93  let mut ictx = InitCtx {
94    re: re.clone(),
95    sr: sr.clone(),
96    cnt: Arc::new(AtomicU32::new(1)), // 0 used, start at 1
97    passthrough: passthrough_init,
98    passthrough_term: HashMap::new()
99  };
100  let init_apperr = super::timeit_async("initialization", || async {
101    rt_handler.init(&mut ictx).await.err()
102  })
103  .await;
104
105  // The application init may have added termctx passthrough data.  Transfer
106  // them to the passthrough term map.
107  for (k, v) in ictx.passthrough_term.drain() {
108    passthrough_term.insert(k, v);
109  }
110
111  // Drop InitCtx.
112  //
113  // This will trigger report to the service subsystem that the init phase is
114  // done.
115  drop(ictx);
116
117  if let Some(ref e) = init_apperr {
118    tracing::error!("Service handler init() failed; {e:?}");
119  }
120
121  let run_apperr = if init_apperr.is_none() {
122    sr.started();
123
124    // Kick off service event monitoring thread before running main app
125    // callback
126    let jh = thread::Builder::new()
127      .name("svcevt".into())
128      .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
129      .unwrap();
130
131    // Run the main service application callback.
132    //
133    // This is basically the service application's "main()".
134    let ret = super::timeit_async("main", || async {
135      rt_handler.run(&re).await.err()
136    })
137    .await;
138
139    if let Some(ref e) = ret {
140      tracing::error!("Service handler run() failed; {e:?}");
141    }
142
143    // Shut down svcevent thread
144    //
145    // Tell it that an (implicit) shutdown event has occurred.
146    // Duplicates don't matter, because once the first one is processed the
147    // thread will terminate.
148    let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));
149
150    // Wait for service event monitoring thread to terminate
151    let _ = task::spawn_blocking(|| jh.join()).await;
152
153    ret
154  } else {
155    None
156  };
157
158  // Special implicit checkpoint 0
159  //
160  // Always send the first shutdown checkpoint here.  Either init() failed or
161  // run retuned.  Either way, we're shutting down.
162  sr.stopping(0, Some(super::SVCAPP_TERM_MSG));
163
164  // Now that the main application has terminated kill off any remaining
165  // auxiliary tasks (read: signal waiters)
166  ks.trigger();
167
168  if (ks.finalize().await).is_err() {
169    log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
170  }
171
172  // Call the application's shutdown() function.
173  let mut tctx = TermCtx {
174    re,
175    sr: sr.clone(),
176    cnt: Arc::new(AtomicU32::new(1)), // 0 used above, so start at 1
177    passthrough: passthrough_term
178  };
179  let term_apperr = super::timeit_async("termination", || async {
180    rt_handler.shutdown(&mut tctx).await.err()
181  })
182  .await;
183
184  // Drop TermCtx.
185  //
186  // This will trigger report to the service subsystem that the term phase is
187  // done.
188  drop(tctx);
189
190  if let Some(ref e) = term_apperr {
191    tracing::error!("Service handler shutdown() failed; {e:?}");
192  }
193
194  // Inform the service subsystem that the the shutdown is complete
195  sr.stopped();
196
197  log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));
198
199  // There can be multiple failures, and we don't want to lose information
200  // about what went wrong, so return an error context that can contain all
201  // callback errors.
202  if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
203    let apperrs = AppErrors {
204      init: init_apperr,
205      run: run_apperr,
206      shutdown: term_apperr
207    };
208    Err(CbErr::SrvApp(apperrs))?;
209  }
210
211  Ok(())
212}
213
214fn init_svc_channels(
215  ks: &KillSwitch
216) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
217  // Create channel used to signal events to application
218  let (tx, rx) = broadcast::channel(16);
219
220  let ks2 = ks.clone();
221
222  // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event.
223  let txc = tx.clone();
224  task::spawn(signals::wait_shutdown(
225    move || {
226      if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
227        log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
228      }
229    },
230    ks2
231  ));
232
233  // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a
234  // Terminate event.
235  let txc = tx.clone();
236  let ks2 = ks.clone();
237  task::spawn(signals::wait_term(
238    move || {
239      let svcevt = SvcEvt::Shutdown(Demise::Terminated);
240      if let Err(e) = txc.send(svcevt) {
241        log::error!("Unable to send SvcEvt::Terminate event; {e}");
242      }
243    },
244    ks2
245  ));
246
247  // There doesn't seem to be anything equivalent to SIGHUP for Windows
248  // (Services)
249  #[cfg(unix)]
250  {
251    let ks2 = ks.clone();
252
253    let txc = tx.clone();
254    task::spawn(signals::wait_reload(
255      move || {
256        if let Err(e) = txc.send(SvcEvt::ReloadConf) {
257          log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
258        }
259      },
260      ks2
261    ));
262  }
263
264  #[cfg(unix)]
265  {
266    let ks2 = ks.clone();
267
268    let txc = tx.clone();
269    task::spawn(signals::wait_user1(
270      move || {
271        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
272          log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
273        }
274      },
275      ks2
276    ));
277  }
278
279  #[cfg(unix)]
280  {
281    let ks2 = ks.clone();
282
283    let txc = tx.clone();
284    task::spawn(signals::wait_user2(
285      move || {
286        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
287          log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
288        }
289      },
290      ks2
291    ));
292  }
293
294  (tx, rx)
295}
296
297// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :