qsu 0.10.1

Service subsystem utilities and runtime wrapper.
Documentation
//! Rocket runtime module.
//!
//! While Rocket uses tokio, it [Rocket] insists on initializing tokio for
//! itself.  Attempting to use the `TokioServiceHandler` will cause `Rocket`s
//! to issue a warning at startup.
//!
//! As a convenience _qsu_ can keep track of rockets and automatically shut
//! them down once the service subsystem requests a shutdown.  To use this
//! feature, the server application should return a `Vec<Rocket<Build>>` from
//! `RocketServiceHandler::init()`.  Any `Rocket` instance in this vec will be
//! ignited before being passed to `RocketServiceHandler::run()`.
//!
//! Server applications do not need to use this feature and should return an
//! empty vector from `init()` in this case.  This also requires the
//! application code to trigger a shutdown of each instance itself.

use std::{
  any::{Any, TypeId},
  sync::{Arc, atomic::AtomicU32},
  thread
};

use hashbrown::HashMap;

use tokio::{sync::broadcast, task};

use killswitch::KillSwitch;

use crate::{
  err::{AppErrors, CbErr},
  rt::{
    Demise, InitCtx, RocketServiceHandler, RunEnv, ServiceReporter, SvcEvt,
    TermCtx, signals
  }
};

#[cfg(unix)]
use crate::rt::UserSig;


pub struct MainParams<ApEr>
where
  ApEr: Send
{
  pub(crate) re: RunEnv,
  pub(crate) svcevt_handler: Box<dyn FnMut(SvcEvt) + Send>,
  pub(crate) rt_handler: Box<dyn RocketServiceHandler<AppErr = ApEr> + Send>,
  pub(crate) sr: ServiceReporter,
  pub(crate) svcevt_ch:
    Option<(broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>)>,
  pub(crate) passthrough_init: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
  pub(crate) passthrough_term: HashMap<TypeId, Box<dyn Any + Send + Sync>>
}

/// Internal `main()`-like routine for server applications that run one or more
/// Rockets as their main application.
pub fn main<ApEr>(params: MainParams<ApEr>) -> Result<(), CbErr<ApEr>>
where
  ApEr: Send
{
  rocket::execute(rocket_async_main(params))?;

  Ok(())
}


#[expect(clippy::too_many_lines)]
async fn rocket_async_main<ApEr>(
  MainParams {
    re,
    svcevt_handler,
    mut rt_handler,
    sr,
    svcevt_ch,
    passthrough_init,
    mut passthrough_term
  }: MainParams<ApEr>
) -> Result<(), CbErr<ApEr>>
where
  ApEr: Send
{
  let ks = KillSwitch::new();

  let start_ts = jiff::Timestamp::now();

  // If a SvcEvt receiver end-point was handed to us, then use it.  The
  // presumption is that there's a service subsystem somewhere that has
  // created the channel and is holding one of the end-points.
  //
  // Otherwise create our own channel and spawn the monitoring tasks that will
  // generate events for it.
  let (tx_svcevt, rx_svcevt) = if let Some((tx_svcevt, rx_svcevt)) = svcevt_ch
  {
    (tx_svcevt, rx_svcevt)
  } else {
    init_svc_channels(&ks)
  };

  // Special implicit checkpoint 0
  sr.starting(0, Some(super::SVCAPP_INIT_MSG));

  // Call application's init() method.
  let mut ictx = InitCtx {
    re: re.clone(),
    sr: sr.clone(),
    cnt: Arc::new(AtomicU32::new(1)), // 0 used, start at 1
    passthrough: passthrough_init,
    passthrough_term: HashMap::new()
  };
  let (rockets, init_apperr) =
    super::timeit_async("initialization", || async {
      match rt_handler.init(&mut ictx).await {
        Ok(rockets) => (rockets, None),
        Err(e) => (Vec::new(), Some(e))
      }
    })
    .await;

  // The application init may have added termctx passthrough data.  Transfer
  // them to the passthrough term map.
  for (k, v) in ictx.passthrough_term.drain() {
    passthrough_term.insert(k, v);
  }

  // Drop InitCtx.
  //
  // This will trigger report to the service subsystem that the init phase is
  // done.
  drop(ictx);

  // Ignite rockets so we can get Shutdown contexts for each of the instances.
  // There are two cases where the rockets vector will be empty:
  // - if init() returned error.
  // - the application doesn't want to use the built-in rocket shutdown
  //   support.
  let mut ignited = vec![];
  let mut rocket_shutdowns = vec![];
  for rocket in rockets {
    let rocket = rocket.ignite().await?;
    rocket_shutdowns.push(rocket.shutdown());
    ignited.push(rocket);
  }

  // If init() was successful, then do some prepartations and then run the
  // application run() callback.
  let run_apperr = if init_apperr.is_none() {
    // Set the service's state to "started"
    sr.started();

    let mut rx_svcevt2 = rx_svcevt.resubscribe();

    // Launch a task that waits for the SvtEvt::Shutdown event.   Once it
    // arrives, tell all rocket instances to gracefully shutdown.
    //
    // Note: We don't want to use the killswitch for this because the
    // killswitch isn't triggered until run() has returned.
    let jh_graceful_landing = task::spawn(async move {
      loop {
        let msg = match rx_svcevt2.recv().await {
          Ok(msg) => msg,
          Err(e) => {
            log::error!("Unable to receive broadcast SvcEvt message, {e}");
            break;
          }
        };
        if let SvcEvt::Shutdown(_) = msg {
          tracing::trace!("Ask rocket instances to shut down gracefully");
          for shutdown in rocket_shutdowns {
            // Tell this rocket instance to shut down gracefully.
            shutdown.notify();
          }
          break;
        }
        tracing::trace!("Ignored message in task waiting for shutdown");
      }
    });

    // Kick off service event monitoring thread before running main app
    // callback
    let jh = thread::Builder::new()
      .name("svcevt".into())
      .spawn(|| crate::rt::svcevt_thread(rx_svcevt, svcevt_handler))
      .unwrap();

    // Run the main service application callback.
    //
    // This is basically the service application's "main()".
    let res = super::timeit_async("main", || async {
      rt_handler.run(ignited, &re).await.err()
    })
    .await;

    // Shut down svcevent thread
    //
    // Tell it that an (implicit) shutdown event has occurred.
    // Duplicates don't matter, because once the first one is processed the
    // thread will terminate.
    let _ = tx_svcevt.send(SvcEvt::Shutdown(Demise::ReachedEnd));

    // Wait for all task that is waiting for a shutdown event to complete
    if let Err(e) = jh_graceful_landing.await {
      log::warn!("An error was returned from the graceful landing task; {e}");
    }

    // Wait for service event monitoring thread to terminate
    let _ = task::spawn_blocking(|| jh.join()).await;

    res
  } else {
    None
  };

  // Special implicit checkpoint 0
  //
  // Always send the first shutdown checkpoint here.  Either init() failed or
  // run retuned.  Either way, we're shutting down.
  sr.stopping(0, Some(super::SVCAPP_TERM_MSG));

  // Now that the main application has terminated kill off any remaining
  // auxiliary tasks (read: signal waiters)
  ks.trigger();
  if (ks.finalize().await).is_err() {
    log::warn!("Attempted to finalize KillSwitch that wasn't triggered yet");
  }

  // Call the application's shutdown() function.
  let mut tctx = TermCtx {
    re,
    sr: sr.clone(),
    cnt: Arc::new(AtomicU32::new(1)), // 0 used above, so start at 1
    passthrough: passthrough_term
  };
  let term_apperr = super::timeit_async("termination", || async {
    rt_handler.shutdown(&mut tctx).await.err()
  })
  .await;

  // Drop TermCtx.
  //
  // This will trigger report to the service subsystem that the term phase is
  // done.
  drop(tctx);

  // Inform the service subsystem that the the shutdown is complete
  sr.stopped();

  log::info!("Service ran for {:#}", super::rounded_elapse(start_ts));

  // There can be multiple failures, and we don't want to lose information
  // about what went wrong, so return an error context that can contain all
  // callback errors.
  if init_apperr.is_some() || run_apperr.is_some() || term_apperr.is_some() {
    let apperrs = AppErrors {
      init: init_apperr,
      run: run_apperr,
      shutdown: term_apperr
    };
    Err(CbErr::SrvApp(apperrs))?;
  }

  Ok(())
}

fn init_svc_channels(
  ks: &KillSwitch
) -> (broadcast::Sender<SvcEvt>, broadcast::Receiver<SvcEvt>) {
  // Create channel used to signal events to application
  let (tx, rx) = broadcast::channel(16);

  // ToDo: autoclone
  let ks2 = ks.clone();

  // SIGINT (on unix) and Ctrl+C on Windows should trigger a Shutdown event.
  // ToDo: autoclone
  let txc = tx.clone();
  task::spawn(signals::wait_shutdown(
    move || {
      if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Interrupted)) {
        log::error!("Unable to send SvcEvt::Shutdown event; {e}");
      }
    },
    ks2
  ));

  // SIGTERM (on unix) and Ctrl+Break/Close on Windows should trigger a
  // Terminate event.
  // ToDo: autoclone
  let txc = tx.clone();
  // ToDo: autoclone
  let ks2 = ks.clone();
  task::spawn(signals::wait_term(
    move || {
      if let Err(e) = txc.send(SvcEvt::Shutdown(Demise::Terminated)) {
        log::error!("Unable to send SvcEvt::Terminate event; {e}");
      }
    },
    ks2
  ));

  // There doesn't seem to be anything equivalent to SIGHUP for Windows
  // (Services)
  #[cfg(unix)]
  {
    // ToDo: autoclone
    let ks2 = ks.clone();

    // ToDo: autoclone
    let txc = tx.clone();
    task::spawn(signals::wait_reload(
      move || {
        if let Err(e) = txc.send(SvcEvt::ReloadConf) {
          log::error!("Unable to send SvcEvt::ReloadConf event; {e}");
        }
      },
      ks2
    ));
  }

  #[cfg(unix)]
  {
    let ks2 = ks.clone();

    let txc = tx.clone();
    task::spawn(signals::wait_user1(
      move || {
        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig1)) {
          log::error!("Unable to send SvcEvt::User(Sig1) event; {e}");
        }
      },
      ks2
    ));
  }

  #[cfg(unix)]
  {
    let ks2 = ks.clone();

    let txc = tx.clone();
    task::spawn(signals::wait_user2(
      move || {
        if let Err(e) = txc.send(SvcEvt::User(UserSig::Sig2)) {
          log::error!("Unable to send SvcEvt::User(Sig2) event; {e}");
        }
      },
      ks2
    ));
  }

  (tx, rx)
}

// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :