spirit-tokio 0.9.2

Tokio helpers for Spirit
Documentation
//! An extension to start the tokio runtime at the appropriate time.

use std::cell::Cell;
use std::sync::{Arc, Mutex, PoisonError};

use err_context::prelude::*;
use err_context::AnyError;
use log::{debug, trace, warn};
use serde::de::DeserializeOwned;
#[cfg(feature = "rt-from-cfg")]
use serde::{Deserialize, Serialize};
use spirit::extension::{Extensible, Extension};
use spirit::validation::Action;
#[cfg(all(feature = "cfg-help", feature = "rt-from-cfg"))]
use structdoc::StructDoc;
use structopt::StructOpt;
use tokio::runtime::{Builder, Runtime};
use tokio::sync::oneshot::{self, Sender};

/// A guard preventing the tokio runtime from shutting down just yet.
///
/// One can keep the runtime alive by holding onto an instance of this.
///
/// Note that a forced shutdown is still possible. In case the `spirit`s `run` returns an error,
/// the runtime is terminated right away.
#[derive(Clone, Debug)]
pub struct ShutGuard(Arc<Sender<()>>);

thread_local! {
    static SHUT_GUARD: Cell<Option<ShutGuard>> = Cell::new(None);
}

fn with_shut_guard<R, F: FnOnce() -> R>(g: Option<ShutGuard>, f: F) -> R {
    SHUT_GUARD.with(|c| {
        // Install the ShutGuard to thread local storage so others can have a look at it.
        assert!(c.replace(g).is_none());
        struct Guard<'a>(&'a Cell<Option<ShutGuard>>);
        impl Drop for Guard<'_> {
            fn drop(&mut self) {
                self.0.take();
            }
        }
        // Make sure it is removed after we are done no matter what.
        let _g = Guard(c);

        f()
    })
}

/// Provides a shut guard.
///
/// Note that the guard is available only from within the hooks and main thread of run of `spirit`
/// and only after a [`Tokio`] has been plugged into it and before termination.
///
/// It is generally possible to pair a created resource (eg. a future) with an instance of this to
/// allow it to shut down gracefully.
///
/// Futures installed by [`FutureInstaller`][crate::FutureInstaller] already have this protection.
pub fn shut_guard() -> Option<ShutGuard> {
    SHUT_GUARD.with(|c| {
        let g = c.take();
        c.set(g.clone());
        g
    })
}

// From tokio documentation
#[cfg(feature = "rt-from-cfg")]
const DEFAULT_MAX_THREADS: usize = 512;
#[cfg(feature = "rt-from-cfg")]
const THREAD_NAME: &str = "tokio-runtime-worker";

/// Configuration for building a threaded runtime.
#[cfg(feature = "rt-from-cfg")]
#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Hash, Serialize)]
#[serde(rename_all = "kebab-case", default)]
#[cfg_attr(feature = "cfg-help", derive(StructDoc))]
#[non_exhaustive]
pub struct Config {
    /// Number of threads used for asynchronous processing.
    ///
    /// Defaults to number of available CPUs in the system if left unconfigured.
    #[serde(skip_serializing_if = "Option::is_none")]
    pub core_threads: Option<usize>,

    /// Total maximum number of threads, including ones for blocking (non-async) operations.
    ///
    /// Must be at least `core-threads`
    #[serde(skip_serializing_if = "Option::is_none")]
    pub max_threads: Option<usize>,

    /// The thread name to use for the worker threads.
    pub thread_name: String,
}

#[cfg(feature = "rt-from-cfg")]
impl Default for Config {
    fn default() -> Self {
        Config {
            core_threads: None,
            max_threads: None,
            thread_name: THREAD_NAME.to_owned(),
        }
    }
}

#[cfg(feature = "rt-from-cfg")]
impl From<Config> for Builder {
    fn from(cfg: Config) -> Builder {
        let mut builder = Builder::new_multi_thread();
        let threads = cfg.core_threads.unwrap_or_else(num_cpus::get);
        builder.worker_threads(threads);
        let max = match cfg.max_threads {
            None if threads >= DEFAULT_MAX_THREADS => {
                warn!(
                    "Increasing max threads from implicit {} to {} to match core threads",
                    DEFAULT_MAX_THREADS, threads
                );
                threads
            }
            None => DEFAULT_MAX_THREADS,
            Some(max_threads) if max_threads <= threads => {
                warn!(
                    "Incrementing max threads from configured {} to {} to match core threads",
                    max_threads,
                    threads + 1
                );
                threads + 1
            }
            Some(max) => max,
        };
        builder.max_blocking_threads(max);
        builder.thread_name(cfg.thread_name);
        builder
    }
}

/// A [`spirit`] [`Extension`] to inject a [`tokio`] runtime.
///
/// This, when inserted into spirit [`Builder`][spirit::Builder] with the
/// [`with_singleton`][Extensible::with_singleton] will provide the application with a tokio
/// runtime.
///
/// This will:
///
/// * Run the application body inside the runtime (to allow spawning tasks and creating tokio
///   resources).
/// * Run the configuration/termination/signal hooks inside the async context of the runtime (for
///   similar reasons).
/// * Keep the runtime running until [`terminate`][spirit::Spirit::terminate] is invoked (either
///   explicitly or by CTRL+C or similar).
///
/// A default instance ([`Tokio::Default`]) is inserted by pipelines containing the
/// [`FutureInstaller`][crate::FutureInstaller].
#[non_exhaustive]
#[allow(clippy::type_complexity)] // While complex, the types are probably more readable inline
pub enum Tokio<O, C> {
    /// Provides the equivalent of [`Runtime::new`].
    #[cfg(feature = "multithreaded")]
    Default,

    /// A singlethreaded runtime.
    SingleThreaded,

    /// Allows the caller to provide arbitrary constructor for the [`Runtime`].
    ///
    /// This variant also allows creating the basic (non-threaded) scheduler if needed. Note that
    /// some operations with such runtime are prone to cause deadlocks.
    Custom(Box<dyn FnMut(&O, &Arc<C>) -> Result<Runtime, AnyError> + Send>),

    /// Uses configuration for constructing the [`Runtime`].
    ///
    /// This'll use the extractor (the first closure) to get a [`Config`], create a [`Builder`]
    /// based on that. It'll explicitly enable all the drivers and enable threaded runtime. Then it
    /// calls the postprocessor (the second closure) to turn it into the [`Runtime`].
    ///
    /// This is the more general form. If you're fine with just basing it on the configuration
    /// without much tweaking, you can use [`Tokio::from_cfg`] (which will create this variant with
    /// reasonable value of preprocessor).
    ///
    /// This is available only with the [`rt-from-cfg`] feature enabled.
    #[cfg(feature = "rt-from-cfg")]
    FromCfg(
        Box<dyn FnMut(&O, &C) -> Config + Send>,
        Box<dyn FnMut(Builder) -> Result<Runtime, AnyError> + Send>,
    ),
}

impl<O, C> Tokio<O, C> {
    /// Simplified construction from configuration.
    ///
    /// This is similar to [`Tokio::FromCfg`]. However, the extractor takes only the configuration
    /// structure, not the command line options. Furthermore, post-processing is simply calling
    /// [`Builder::build`], without a chance to tweak.
    #[cfg(feature = "rt-from-cfg")]
    pub fn from_cfg<E>(mut extractor: E) -> Self
    where
        E: FnMut(&C) -> Config + Send + 'static,
    {
        let extractor = move |_opts: &O, cfg: &C| extractor(cfg);
        let finish = |mut builder: Builder| -> Result<Runtime, AnyError> { Ok(builder.build()?) };
        Tokio::FromCfg(Box::new(extractor), Box::new(finish))
    }

    /// Method to create the runtime.
    ///
    /// This can be used when not taking advantage of the spirit auto-management features.
    pub fn create(&mut self, opts: &O, cfg: &Arc<C>) -> Result<Runtime, AnyError> {
        match self {
            #[cfg(feature = "multithreaded")]
            Tokio::Default => Runtime::new().map_err(AnyError::from),
            Tokio::SingleThreaded => Builder::new_current_thread()
                .enable_all()
                .build()
                .map_err(AnyError::from),
            Tokio::Custom(ctor) => ctor(opts, cfg),
            #[cfg(feature = "rt-from-cfg")]
            Tokio::FromCfg(extractor, postprocess) => {
                let cfg = extractor(opts, cfg);
                let mut builder: Builder = cfg.into();
                builder.enable_all();
                postprocess(builder)
            }
        }
    }
}

#[cfg(feature = "multithreaded")]
impl<O, C> Default for Tokio<O, C> {
    fn default() -> Self {
        Tokio::Default
    }
}

impl<E> Extension<E> for Tokio<E::Opts, E::Config>
where
    E: Extensible<Ok = E>,
    E::Config: DeserializeOwned + Send + Sync + 'static,
    E::Opts: StructOpt + Send + Sync + 'static,
{
    fn apply(mut self, ext: E) -> Result<E, AnyError> {
        trace!("Wrapping in tokio runtime");

        // The local hackonomicon:
        //
        // * We need to create a runtime and wrap both the application body in it and the
        //   callbacks/hooks of spirit, as they might want to create some futures/resources that
        //   need access to tokio.
        // * But that will be ready only after we did the configuration. By that time we no longer
        //   have access to anything that would allow us this kind of injection.
        // * Furthermore, we need even the following on_config callbacks/whatever else to be
        //   wrapped in the handle.
        //
        // Therefore we install the wrappers in right away and provide them with the data only once
        // it is ready. The mutexes will always be unlocked, all these things will in fact be
        // called from the same thread, it's just not possible to explain to Rust right now.

        let runtime = Arc::new(Mutex::new(None));
        let handle = Arc::new(Mutex::new(None));

        let init = {
            let mut initialized = false;
            let runtime = Arc::clone(&runtime);
            let handle = Arc::clone(&handle);
            #[cfg(feature = "rt-from-cfg")]
            let mut prev_cfg = None;

            // This runs as config validator to be sooner in the chain.
            move |_: &_, cfg: &Arc<_>, opts: &_| -> Result<Action, AnyError> {
                if initialized {
                    #[cfg(feature = "rt-from-cfg")]
                    if let Tokio::FromCfg(extract, _) = &mut self {
                        let prev = prev_cfg
                            .as_ref()
                            .expect("Should have stored config on init");
                        let new = extract(opts, cfg);
                        if prev != &new {
                            warn!("Tokio configuration differs, but can't be reloaded at run time");
                        }
                    }
                } else {
                    debug!("Creating the tokio runtime");
                    let new_runtime = self.create(opts, cfg).context("Tokio runtime creation")?;
                    let new_handle = new_runtime.handle().clone();
                    // We do so *right now* so the following config validators have some chance of
                    // having the runtime. It's one-shot anyway, so we are not *replacing* anything
                    // previous.
                    *runtime.lock().unwrap() = Some(new_runtime);
                    *handle.lock().unwrap() = Some(new_handle);
                    initialized = true;
                    #[cfg(feature = "rt-from-cfg")]
                    if let Tokio::FromCfg(extract, _) = &mut self {
                        prev_cfg = Some(extract(opts, cfg));
                    }
                }
                Ok(Action::new())
            }
        };

        let (terminate_send, terminate_recv) = oneshot::channel::<()>();
        let shutdown_guard = Arc::new(Mutex::new(Some(ShutGuard(Arc::new(terminate_send)))));
        let shutdown_guard_main = Arc::clone(&shutdown_guard);
        let shutdown_guard_drain = Arc::clone(&shutdown_guard);

        // Ugly hack: seems like Rust is not exactly ready to deal with our crazy type in here and
        // deduce it in just the function call :-(. Force it by an explicit type.
        #[allow(clippy::type_complexity)] // We are glad we managed to make it compile at all
        let around_hooks: Box<dyn for<'a> FnMut(Box<dyn FnOnce() + 'a>) + Send> =
            Box::new(move |inner| {
                let locked = handle
                    .lock()
                    // The inner may panic and we are OK with that, we just want to be able to run
                    // next time again.
                    .unwrap_or_else(PoisonError::into_inner);

                let shut_guard = shutdown_guard.lock().unwrap().clone();
                with_shut_guard(shut_guard, || {
                    if let Some(handle) = locked.as_ref() {
                        trace!("Wrapping hooks into tokio handle");
                        let _guard = handle.enter();
                        inner();
                        trace!("Leaving tokio handle");
                    } else {
                        // During startup, the handle/runtime is not *yet* ready. This is OK and
                        // expected, but we must run without it. And we also need to unlock that
                        // mutex, because the inner might actually contain the above init and want
                        // to provide the handle, so we don't want a deadlock.
                        drop(locked);
                        trace!("Running hooks without tokio handle, not available yet");
                        inner();
                    }
                })
            });

        ext
            .config_validator(init)
            .around_hooks(around_hooks)
            .on_terminate(move || {
                trace!("Removing global shutdown guard (others might be present in resources)");
                shutdown_guard_drain.lock().unwrap().take();
            })
            .run_around(move |spirit, inner| -> Result<(), AnyError> {
                // No need to worry about poisons here, we are going to be called just once
                let runtime = runtime.lock().unwrap().take().expect("Run even before config");
                debug!("Running with tokio runtime");
                let _guard = runtime.enter();
                let shut_guard = shutdown_guard_main.lock().unwrap().clone();
                let result = with_shut_guard(shut_guard, inner);
                debug!("Inner bodies ended");
                if result.is_ok() {
                    debug!("Waiting for spirit to terminate");
                    // It's OK to terminate with error here. We actually release it by dropping the
                    // last Arc to it.
                    let _ = runtime.block_on(terminate_recv);
                    debug!("Spirit signalled termination to runtime");
                    drop(runtime);
                } else {
                    warn!("Tokio runtime initialization body returned an error, trying to shut everything down");
                    runtime.shutdown_background();
                    spirit.terminate();
                }
                result
            })
    }
}