spry 0.0.4

Resilient, self-healing async process hierarchies in the style of Erlang/OTP
Documentation
use std::collections::HashMap;
use std::fmt::Display;
use std::future::Future;
use std::hash::Hash;
use std::marker::PhantomData;
use std::panic::AssertUnwindSafe;

use futures::FutureExt;
use tokio::task::JoinHandle;

use crate::breaker::{Breaker, RateLimited};
use crate::builder::{ChildBuilder, SystemBuilder};
use crate::error::Escalation;
use crate::internal;
use crate::nursery::Nursery;
use crate::policy::Permanent;
use crate::signals::{SettlingToken, ShutdownToken, TerminationToken};
use crate::system::SystemSupervisor;

/// Types that define a subsystem within the supervision hierarchy.
///
/// Note that unlike child processes, system start functions cannot return values and take `&self`
/// references instead of taking ownership. This is necessary as these functions will be called at
/// arbitrary points in time whenever the system needs to restart.
pub trait System<K: Key>: Send + Sync + 'static {
  /// Called to start and restart this subsystem.
  /// The [Scope] can be used to start child processes and subsystems.
  ///
  /// Note that at your convenience you may write impls for this function as `async fn start(self, scope: &mut Scope)`.
  fn start(&self, scope: &mut Scope<K>) -> impl Future<Output = ()> + Send;
}

/// Types that define a start function to launch a child process.
///
/// Most notably, asynchronous functions that take a [Context] argument and return a [MainLoop]
/// implement this trait. This allows for child processes to be defined via `async fn` blocks or
/// even closures.
pub trait Child<'a, R>: Send + 'a {
  /// Note that at your convenience you may write impls for this function as `async fn start(self, ctx: Context) -> MainLoop<R>`.
  fn start(self, ctx: Context) -> impl Future<Output = MainLoop<R>> + Send + 'a;
}

impl<'a, F, Fut, R: 'a> Child<'a, R> for F
where
  F: FnOnce(Context) -> Fut + Send + 'a,
  Fut: Future<Output = MainLoop<R>> + Send + 'a,
{
  async fn start(self, ctx: Context) -> MainLoop<R> {
    self(ctx).await
  }
}

/// The type of keys for naming child processes.
pub trait Key: Clone + Hash + Eq + Send + 'static {
  /// A normative name used in reporting. Each child is aware of its own name as a string.
  fn name(&self) -> String;
}

impl Key for () {
  fn name(&self) -> String {
    "unit".to_string()
  }
}

macro_rules! key_from_to_string {
  ($($t:ty),*) => { $(impl Key for $t { fn name(&self) -> String { self.to_string() } })* };
}

key_from_to_string!(&'static str, String, usize, u64, u32, u16, u8, isize, i64, i32, i16, i8);

/// Non-normative, displayable name for a child process.
// todo: this sucks, everything touching it sucks
#[derive(Clone, Debug)]
pub struct ChildName(pub(crate) String);

impl Display for ChildName {
  fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    write!(f, "{}", self.0)
  }
}

/// Services for defining the child processes and subsystems within a [System].
pub struct Scope<K> {
  pub(crate) context: Context,
  pub(crate) history: HashMap<K, internal::ProcessOutcome>,
  pub(crate) nursery: Nursery<K>,
}

impl<K: Key> Scope<K> {
  pub fn child<'a, R, C: Child<'a, R>>(&'a mut self, key: K, child: C) -> ChildBuilder<'a, K, Permanent, R> {
    ChildBuilder::new(self, key, Box::new(|cx| Box::pin(child.start(cx))))
  }

  pub fn system<Kp: Key, S: System<Kp>>(&mut self, key: K, system: S) -> SystemBuilder<K, Permanent, S, RateLimited> {
    SystemBuilder::new(key, system, self)
  }
}

/// Services available to child processes within the supervision hierarchy.
///
/// Primarily, this is a way for a child to listen for the parent's request to settle.
pub struct Context {
  name: ChildName,
  settling_token: SettlingToken,
  shutdown_token: ShutdownToken,
  termination_token: TerminationToken,
}

impl Context {
  /// The name of this child, the dotted concatenation of its parent's name and its key.
  pub fn name(&self) -> &ChildName {
    &self.name
  }

  /// Returns when this child is being asked to settle.
  pub async fn settled(&self) {
    self.settling_token.settled().await
  }

  /// Returns whether this child has already been asked to settle.
  pub fn is_settled(&self) -> bool {
    self.settling_token.is_settled()
  }

  /// A portable token that can be used to await or observe requests for this child to settle.
  pub fn settling_token(&self) -> &SettlingToken {
    &self.settling_token
  }

  /// A portable token that can be used to detect the termination of this child.
  pub fn termination_token(&self) -> &TerminationToken {
    &self.termination_token
  }

  /// A portable token that can be used to signal this child to shut down.
  pub fn shutdown_token(&self) -> &ShutdownToken {
    &self.shutdown_token
  }

  pub(crate) fn new_child_context(&self, name_extension: String) -> Self {
    let name = ChildName(format!("{parent}.{name_extension}", parent = self.name.0));
    Self {
      name,
      settling_token: SettlingToken::new(),
      shutdown_token: ShutdownToken::new(),
      termination_token: TerminationToken::new(),
    }
  }
}

/// A container for a child's post-startup [JoinHandle] and the value returned to the system start-up function.
pub struct MainLoop<R> {
  pub(crate) handle: JoinHandle<()>,
  pub(crate) return_value: R,
}

impl MainLoop<()> {
  pub fn new(handle: JoinHandle<()>) -> Self {
    Self { handle, return_value: () }
  }
}

impl<R> MainLoop<R> {
  pub fn new_returning(value: R, handle: JoinHandle<()>) -> Self {
    Self { handle, return_value: value }
  }

  pub fn with<Rp>(self, return_value: Rp) -> MainLoop<Rp> {
    MainLoop { handle: self.handle, return_value }
  }
}

impl From<JoinHandle<()>> for MainLoop<()> {
  fn from(handle: JoinHandle<()>) -> Self {
    Self { handle, return_value: () }
  }
}

/// A top-level supervisor for a [System].
///
/// Typically, there's just a single one of these in an application.
pub struct Toplevel<K, S, B> {
  name: ChildName,
  settling_token: SettlingToken,
  shutdown_token: ShutdownToken,
  termination_token: TerminationToken,
  breaker: B,
  system: S,
  phantom: PhantomData<K>,
}

impl<K, S> Toplevel<K, S, RateLimited> {
  pub fn new<N: ToString>(name: N, system: S) -> Self {
    let name = ChildName(name.to_string());

    // For Toplevel, shutdown and settling are identical.
    // This is because we only ever settle the Toplevel system; it's given infinite time to complete.
    let settling_token = SettlingToken::new();
    let shutdown_token = settling_token.clone().into_shutdown_token();

    let termination_token = TerminationToken::new();
    let breaker = RateLimited::default();
    Self { name, settling_token, shutdown_token, termination_token, breaker, system, phantom: PhantomData }
  }
}

impl<K, S, P> Toplevel<K, S, P> {
  pub fn with_breaker<Pp: Breaker<K>>(self, breaker: Pp) -> Toplevel<K, S, Pp> {
    Toplevel {
      breaker,
      name: self.name,
      settling_token: self.settling_token,
      shutdown_token: self.shutdown_token,
      termination_token: self.termination_token,
      system: self.system,
      phantom: PhantomData,
    }
  }

  pub fn shutdown_token(&self) -> &ShutdownToken {
    &self.shutdown_token
  }

  pub fn termination_token(&self) -> &TerminationToken {
    &self.termination_token
  }
}

impl<K: Key, S: System<K>, P: Breaker<K>> Toplevel<K, S, P> {
  #[tracing::instrument(target = "spry", level = "info", name = "toplevel", skip_all)]
  pub async fn start(self) -> Result<(), Escalation> {
    let termination_token = self.termination_token.clone();

    let context = Context {
      name: self.name,
      settling_token: self.settling_token.clone(),
      shutdown_token: self.settling_token.into_shutdown_token(), // For Toplevel, shutdown and settling are identical.
      termination_token: termination_token.clone(),
    };

    let sup = SystemSupervisor::new(self.system, self.breaker);
    let main = sup.start_inner(context).await;
    let result = AssertUnwindSafe(main).catch_unwind().await;

    termination_token.signal_termination();

    match result {
      Ok(()) => Ok(()),
      Err(panic) => match panic.downcast() {
        Ok(known) => Err(*known),
        Err(unknown) => Err(Escalation::Unknown(unknown)),
      },
    }
  }
}