spry 0.0.4

Resilient, self-healing async process hierarchies in the style of Erlang/OTP
Documentation
//! Types used in the configuration of child processes and subsystems

use futures::FutureExt;
use std::marker::PhantomData;
use std::panic::{panic_any, AssertUnwindSafe};
use std::time::Duration;
use tracing::Instrument;

use crate::breaker::{Breaker, RateLimited};
use crate::core::{ChildName, Key, MainLoop, Scope, System};
use crate::error::StartupFailure;
use crate::internal::{ChildConfig, Policy, StartFn};
use crate::policy::{Permanent, Temporary, Transient};
use crate::system::SystemSupervisor;

/// A scoped builder for configuring a child to start within a [System].
///
/// The type parameters on this struct are:
/// - `K`: The key type used to identify this child
/// - `P`: A type from the [crate::policy] module tagging the style for this child
/// - `R`: The type of values returned once this child has started
#[must_use]
pub struct ChildBuilder<'a, K, P, R> {
  key: K,
  start_fn: StartFn<'a, R>,
  scope: &'a mut Scope<K>,
  settling_timeout: Option<Duration>,
  phantom: PhantomData<P>,
}

impl<'a, K, P, R> ChildBuilder<'a, K, P, R>
where
  K: Key,
  P: Policy<R>,
{
  const DEFAULT_SETTLING_TIMEOUT: Duration = Duration::from_secs(5);

  pub(crate) fn new(scope: &'a mut Scope<K>, key: K, start_fn: StartFn<'a, R>) -> Self {
    Self { key, start_fn, scope, settling_timeout: Some(Self::DEFAULT_SETTLING_TIMEOUT), phantom: PhantomData }
  }

  /// Change the style of this child.
  ///
  /// See also [Self::into_permanent], [Self::into_transient], and [Self::into_temporary].
  pub fn with_policy<Lp: Policy<R>>(self) -> ChildBuilder<'a, K, Lp, R> {
    ChildBuilder {
      key: self.key,
      start_fn: self.start_fn,
      scope: self.scope,
      settling_timeout: self.settling_timeout,
      phantom: PhantomData,
    }
  }

  /// See [Self::with_policy].
  pub fn into_permanent(self) -> ChildBuilder<'a, K, Permanent, R> {
    self.with_policy()
  }

  /// See [Self::with_policy].
  pub fn into_transient(self) -> ChildBuilder<'a, K, Transient, R> {
    self.with_policy()
  }

  /// See [Self::with_policy].
  pub fn into_temporary(self) -> ChildBuilder<'a, K, Temporary, R> {
    self.with_policy()
  }

  /// Configures the maximum amount of time to wait for this child to settle.
  pub fn with_settling_timeout(mut self, duration: Duration) -> Self {
    self.settling_timeout = Some(duration);
    self
  }

  /// Allows this child an infinite amount of time to settle.
  ///
  /// **Warning**, use this setting very judiciously. It allows for permanently stalled shutdowns.
  pub fn with_settling_indefinite(mut self) -> Self {
    self.settling_timeout = None;
    self
  }
}

impl<'a, K, P, R> ChildBuilder<'a, K, P, R>
where
  K: Key,
  P: Policy<R>,
  R: 'a,
{
  pub async fn spawn(self) -> P::Output {
    if self.scope.nursery.contains_key(&self.key) {
      let name = format!("{parent}.{child}", parent = self.scope.context.name(), child = self.key.name());
      panic_any(StartupFailure::ReusedKey(ChildName(name)));
    }

    if P::should_skip(self.scope.history.get(&self.key)) {
      return P::fail();
    }

    let ctx = self.scope.context.new_child_context(self.key.name());
    let shutdown_token = ctx.shutdown_token().clone();
    let config = ChildConfig { policy: P::concrete(), settling_timeout: self.settling_timeout };

    let name = ctx.name().clone();

    let span = tracing::info_span!(
      target: "spry", "spawn",
      system = display(self.scope.context.name().to_string()),
      key = display(self.key.name())
    );

    let settling_token = ctx.settling_token().clone();
    let termination_token = ctx.termination_token().clone();

    let fut = AssertUnwindSafe(async { (self.start_fn)(ctx).await }.instrument(span)).catch_unwind();
    tokio::select! {
      biased;
      // note: we could consider trying to watch for the deaths of prior children here
      //       but today, we do not, instead we just finish the startup
      result = fut => match result {
        Err(panic) => panic_any(StartupFailure::ChildFailure(name, panic)),
        Ok(MainLoop { handle, return_value }) => {
          // safety: we own scope mut/exclusively, so the check we did above will remain valid
          let _ =
            self.scope.nursery.try_insert(self.key, config, shutdown_token, settling_token, termination_token, handle);
          P::output(return_value)
        }
      }
    }
  }
}

/// A scoped builder for configuring a subsystem to start within a [System].
///
/// The type parameters on this struct are:
/// - `K`: The key type used to identify this child
///   - Note, this may be *distinct* from the type of keys identifying children of this subsystem
/// - `P`: A type from the [crate::policy] module tagging the style policy for this subsystem
/// - `S`: The type of the [System] to start
/// - `B`: The type of the breaker policy for this subsystem
///
/// Subsystems are always permanent and are allowed to take an arbitrary amount of time to settle.
pub struct SystemBuilder<'a, K, P, S, B> {
  key: K,
  system: S,
  scope: &'a mut Scope<K>,
  breaker: B,
  phantom: PhantomData<P>,
}

impl<'a, K, P, S> SystemBuilder<'a, K, P, S, RateLimited> {
  pub(crate) fn new(key: K, system: S, scope: &'a mut Scope<K>) -> Self {
    let breaker = RateLimited::default();
    Self { key, system, scope, breaker, phantom: PhantomData }
  }
}

impl<'a, K, P, S, B> SystemBuilder<'a, K, P, S, B> {
  /// Change the style policy of this subsystem.
  ///
  /// See also [Self::into_permanent], [Self::into_transient], and [Self::into_temporary].
  pub fn with_style<PP: Policy<()>>(self) -> SystemBuilder<'a, K, PP, S, B> {
    SystemBuilder { key: self.key, system: self.system, scope: self.scope, breaker: self.breaker, phantom: PhantomData }
  }

  /// See [Self::with_style].
  pub fn into_permanent(self) -> SystemBuilder<'a, K, Permanent, S, B> {
    self.with_style()
  }

  /// See [Self::with_style].
  pub fn into_transient(self) -> SystemBuilder<'a, K, Transient, S, B> {
    self.with_style()
  }

  /// See [Self::with_style].
  pub fn into_temporary(self) -> SystemBuilder<'a, K, Temporary, S, B> {
    self.with_style()
  }

  pub fn with_breaker<Pp: Breaker<K>>(self, breaker: Pp) -> SystemBuilder<'a, K, P, S, Pp> {
    SystemBuilder { key: self.key, system: self.system, scope: self.scope, breaker, phantom: PhantomData }
  }
}

impl<'a, K, P, S, B> SystemBuilder<'a, K, P, S, B>
where
  K: Key,
  P: Policy<()>,
  S: System<K>,
  B: Breaker<K>,
{
  pub async fn spawn(self) -> P::Output {
    let Self { system, breaker, scope, key, .. } = self;
    let sup = SystemSupervisor::new(system, breaker);
    scope.child(key, sup).with_policy::<P>().with_settling_indefinite().spawn().await
  }
}