qsu 0.10.1

Service subsystem utilities and runtime wrapper.
Documentation
use std::{
  any::{Any, TypeId},
  sync::{Arc, atomic::AtomicU32},
  thread
};

use hashbrown::HashMap;

use tokio::sync::broadcast;

use crate::{
  err::{AppErrors, CbErr},
  rt::{
    Demise, InitCtx, RunEnv, ServiceHandler, ServiceReporter, SvcEvt, TermCtx,
    signals
  }
};

#[cfg(unix)]
use crate::rt::{UserSig, signals::SigType};

pub struct MainParams<ApEr> {
  pub(crate) re: RunEnv,
  pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
  pub(crate) rt_handler: Box<dyn ServiceHandler<AppErr = ApEr>>,
  pub(crate) sr: ServiceReporter,
  pub(crate) svcevt_ch:
    Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
  pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
  pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
  pub(crate) test_mode: bool
}

/// Internal `main()`-like routine for server applications that run plain old
/// non-`async` code.
pub fn main<ApEr>(
  MainParams {
    re,
    svcevt_handler,
    mut rt_handler,
    sr,
    svcevt_ch,
    passthrough_init,
    mut passthrough_term,
    test_mode
  }: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>> {
  // Get rid of unused variable warning
  #[cfg(unix)]
  let _ = test_mode;

  let start_ts = jiff::Timestamp::now();

  // If a channel was passed from caller, then use it.  The assumption is
  // that there's some other runtime that is monitoring for system
  // (service-related) events that is holding a sending end-point.
  //
  // Otherwise create a new unbounded channel and kick off the appropriate
  // system event monitoring.
  let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
  {
    // Use the broadcast receiver supplied by caller (it likely originates from
    // a service runtime integration).
    (tx_svcevt, rx_svcevt)
  } else {
    let (tx, rx) = broadcast::channel(16);

    let tx2 = tx.clone();

    #[cfg(unix)]
    signals::sync_sigmon(move |st| match st {
      SigType::Usr1 => {
        if let Err(e) = tx2.send(SvcEvt::User(UserSig::Sig1)) {
          log::error!("Unable to send SvcEvt::Info event; {e}",);
        }
      }
      SigType::Usr2 => {
        if let Err(e) = tx2.send(SvcEvt::User(UserSig::Sig2)) {
          log::error!("Unable to send SvcEvt::Info event; {e}");
        }
      }
      SigType::Int => {
        if let Err(e) = tx2.send(SvcEvt::Shutdown(Demise::Interrupted)) {
          log::error!("Unable to send SvcEvt::Shutdown event; {e}");
        }
      }
      SigType::Term => {
        if let Err(e) = tx2.send(SvcEvt::Shutdown(Demise::Terminated)) {
          log::error!("Unable to send SvcEvt::Terminate event; {e}");
        }
      }
      SigType::Hup => {
        if let Err(e) = tx2.send(SvcEvt::ReloadConf) {
          log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
        }
      }
    })?;

    // On Windows, if rx_svcevt is None, means we're not running under the
    // service subsystem (i.e. we're running as a foreground process), so
    // register a Ctrl+C handler.
    #[cfg(windows)]
    signals::sync_kill_to_event(tx2, test_mode)?;

    (tx, rx)
  };

  // Special implicit checkpoint 0
  sr.starting(0, Some(super::SVCAPP_INIT_MSG));

  // Call server application's init() method, passing along a startup state
  // reporter object.
  let mut ictx = InitCtx {
    re: re.clone(),
    sr: sr.clone(),
    cnt: Arc::new(AtomicU32::new(1)), // 0 used, start at 1
    passthrough: passthrough_init,
    passthrough_term: HashMap::new()
  };

  // Call service handler's init(), and time its execution
  let init_apperr =
    super::timeit("initialization", || rt_handler.init(&mut ictx).err());

  // The application init may have added termctx passthrough data.  Transfer
  // them to the passthrough term map.
  for (k, v) in ictx.passthrough_term.drain() {
    passthrough_term.insert(k, v);
  }

  // Drop InitCtx.
  //
  // This will trigger report to the service subsystem that the init phase is
  // done.
  drop(ictx);

  // If init() was successful, set the service's state to "started" and then
  // call the server application's run() method.
  let run_apperr = if init_apperr.is_none() {
    sr.started();

    // Kick off service event monitoring thread before running main app
    // callback
    let jh = thread::Builder::new()
      .name("svcevt".into())
      .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
      .unwrap();

    // Run the main service application callback.
    //
    // This is basically the service application's "main()".
    let ret = super::timeit("main", || rt_handler.run(&re).err());

    // Shut down svcevent thread
    //
    // Tell it that an (implicit) shutdown event has occurred.
    // Duplicates don't matter, because once the first one is processed the
    // thread will terminate.
    let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));

    // Wait for service event monitoring thread to terminate
    let _ = jh.join();

    ret
  } else {
    None
  };

  // Special implicit checkpoint 0
  //
  // Always send the first shutdown checkpoint here.  Either init() failed or
  // run returned.  Either way, we're shutting down.
  sr.stopping(0, Some(super::SVCAPP_TERM_MSG));

  // Call the application's shutdown() function, passing along a shutdown state
  // reporter object.
  let mut tctx = TermCtx {
    re,
    sr: sr.clone(),
    cnt: Arc::new(AtomicU32::new(1)), // 0 used, start at 1
    passthrough: passthrough_term
  };
  let term_apperr =
    super::timeit("termination", || rt_handler.shutdown(&mut tctx).err());

  // Drop TermCtx.
  //
  // This will trigger report to the service subsystem that the term phase is
  // done.
  drop(tctx);

  // Inform the service subsystem that the the shutdown is complete
  sr.stopped();

  log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));

  // There can be multiple failures, and we don't want to lose information
  // about what went wrong, so return an error context that can contain all
  // callback errors.
  if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
    let apperrs = AppErrors {
      init: init_apperr,
      run: run_apperr,
      shutdown: term_apperr
    };
    Err(CbErr::SrvApp(apperrs))?;
  }

  Ok(())
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :