qsu 0.10.1

Service subsystem utilities and runtime wrapper.
Documentation
use std::{
  any::{Any, TypeId},
  sync::{Arc, atomic::AtomicU32},
  thread
};

use hashbrown::HashMap;

use tokio::{runtime, sync::broadcast, task};

use crate::{
  err::{AppErrors, CbErr},
  rt::{
    Demise, InitCtx, RunEnv, ServiceReporter, SvcEvt, TermCtx,
    TokioServiceHandler, signals
  }
};

use killswitch::KillSwitch;

#[cfg(unix)]
use crate::rt::UserSig;


pub struct MainParams<ApEr> {
  pub(crate) re: RunEnv,
  pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
  pub(crate) rt_handler: Box<dyn TokioServiceHandler<AppErr = ApEr> + Send>,
  pub(crate) sr: ServiceReporter,
  pub(crate) svcevt_ch:
    Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
  pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
  pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
}


/// Internal `main()`-like routine for server applications that run the tokio
/// runtime for `async` code.
pub fn main<ApEr>(
  rtbldr: Option<runtime::Builder>,
  params: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>>
where
  ApEr: Send + std::fmt::Debug
{
  let rt = if let Some(mut bldr) = rtbldr {
    bldr.build()?
  } else {
    tokio::runtime::Runtime::new()?
  };
  rt.block_on(async_main(params))?;

  Ok(())
}

/// The `async` main function for tokio servers.
///
/// If `rx_svcevt` is `Some(_)` it means the channel was created elsewhere
/// (implied: The transmitting endpoint lives somewhere else).  If it is `None`
/// the channel needs to be created.
async fn async_main<ApEr>(
  MainParams {
    re,
    svcevt_handler,
    mut rt_handler,
    sr,
    svcevt_ch,
    passthrough_init,
    mut passthrough_term
  }: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>>
where
  ApEr: Send + std::fmt::Debug
{
  let ks = KillSwitch::new();

  let start_ts = jiff::Timestamp::now();

  // If a SvcEvt receiver end-point was handed to us, then use it.  Otherwise
  // create our own and spawn the monitoring tasks that will generate events
  // for it.
  let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
  {
    (tx_svcevt, rx_svcevt)
  } else {
    init_svc_channels(&ks)
  };

  // Special implicit checkpoint 0
  sr.starting(0, Some(super::SVCAPP_INIT_MSG));

  // Call application's init() method.
  let mut ictx = InitCtx {
    re: re.clone(),
    sr: sr.clone(),
    cnt: Arc::new(AtomicU32::new(1)), // 0 used, start at 1
    passthrough: passthrough_init,
    passthrough_term: HashMap::new()
  };
  let init_apperr = super::timeit_async("initialization", || async {
    rt_handler.init(&mut ictx).await.err()
  })
  .await;

  // The application init may have added termctx passthrough data.  Transfer
  // them to the passthrough term map.
  for (k, v) in ictx.passthrough_term.drain() {
    passthrough_term.insert(k, v);
  }

  // Drop InitCtx.
  //
  // This will trigger report to the service subsystem that the init phase is
  // done.
  drop(ictx);

  if let Some(ref e) = init_apperr {
    tracing::error!("Service handler init() failed; {e:?}");
  }

  let run_apperr = if init_apperr.is_none() {
    sr.started();

    // Kick off service event monitoring thread before running main app
    // callback
    let jh = thread::Builder::new()
      .name("svcevt".into())
      .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
      .unwrap();

    // Run the main service application callback.
    //
    // This is basically the service application's "main()".
    let ret = super::timeit_async("main", || async {
      rt_handler.run(&re).await.err()
    })
    .await;

    if let Some(ref e) = ret {
      tracing::error!("Service handler run() failed; {e:?}");
    }

    // Shut down svcevent thread
    //
    // Tell it that an (implicit) shutdown event has occurred.
    // Duplicates don't matter, because once the first one is processed the
    // thread will terminate.
    let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));

    // Wait for service event monitoring thread to terminate
    let _ = task::spawn_blocking(|| jh.join()).await;

    ret
  } else {
    None
  };

  // Special implicit checkpoint 0
  //
  // Always send the first shutdown checkpoint here.  Either init() failed or
  // run retuned.  Either way, we're shutting down.
  sr.stopping(0, Some(super::SVCAPP_TERM_MSG));

  // Now that the main application has terminated kill off any remaining
  // auxiliary tasks (read: signal waiters)
  ks.trigger();

  if (ks.finalize().await).is_err() {
    log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
  }

  // Call the application's shutdown() function.
  let mut tctx = TermCtx {
    re,
    sr: sr.clone(),
    cnt: Arc::new(AtomicU32::new(1)), // 0 used above, so start at 1
    passthrough: passthrough_term
  };
  let term_apperr = super::timeit_async("termination", || async {
    rt_handler.shutdown(&mut tctx).await.err()
  })
  .await;

  // Drop TermCtx.
  //
  // This will trigger report to the service subsystem that the term phase is
  // done.
  drop(tctx);

  if let Some(ref e) = term_apperr {
    tracing::error!("Service handler shutdown() failed; {e:?}");
  }

  // Inform the service subsystem that the the shutdown is complete
  sr.stopped();

  log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));

  // There can be multiple failures, and we don't want to lose information
  // about what went wrong, so return an error context that can contain all
  // callback errors.
  if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
    let apperrs = AppErrors {
      init: init_apperr,
      run: run_apperr,
      shutdown: term_apperr
    };
    Err(CbErr::SrvApp(apperrs))?;
  }

  Ok(())
}

fn init_svc_channels(
  ks: &KillSwitch
) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
  // Create channel used to signal events to application
  let (tx, rx) = broadcast::channel(16);

  let ks2 = ks.clone();

  // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event.
  let txc = tx.clone();
  task::spawn(signals::wait_shutdown(
    move || {
      if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
        log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
      }
    },
    ks2
  ));

  // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a
  // Terminate event.
  let txc = tx.clone();
  let ks2 = ks.clone();
  task::spawn(signals::wait_term(
    move || {
      let svcevt = SvcEvt::Shutdown(Demise::Terminated);
      if let Err(e) = txc.send(svcevt) {
        log::error!("Unable to send SvcEvt::Terminate event; {e}");
      }
    },
    ks2
  ));

  // There doesn't seem to be anything equivalent to SIGHUP for Windows
  // (Services)
  #[cfg(unix)]
  {
    let ks2 = ks.clone();

    let txc = tx.clone();
    task::spawn(signals::wait_reload(
      move || {
        if let Err(e) = txc.send(SvcEvt::ReloadConf) {
          log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
        }
      },
      ks2
    ));
  }

  #[cfg(unix)]
  {
    let ks2 = ks.clone();

    let txc = tx.clone();
    task::spawn(signals::wait_user1(
      move || {
        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
          log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
        }
      },
      ks2
    ));
  }

  #[cfg(unix)]
  {
    let ks2 = ks.clone();

    let txc = tx.clone();
    task::spawn(signals::wait_user2(
      move || {
        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
          log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
        }
      },
      ks2
    ));
  }

  (tx, rx)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :