spry 0.0.4

Resilient, self-healing async process hierarchies in the style of Erlang/OTP
Documentation
use futures::FutureExt;
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::panic::{panic_any, AssertUnwindSafe};
use tokio::time::Instant;
use tracing::field::display;
use tracing::{Instrument, Span};

use crate::breaker::Breaker;
use crate::core::{Context, Key, MainLoop, Scope, System};
use crate::error::{Escalation, RestartReason, StartupFailure, TerminationReason};
use crate::nursery::{Nursery, TerminationReport};
use crate::Child;

pub struct SystemSupervisor<K, S, B> {
  system: S,
  breaker: B,
  phantom: PhantomData<K>,
}

impl<K, S, B> SystemSupervisor<K, S, B> {
  pub fn new(system: S, breaker: B) -> Self {
    Self { system, breaker, phantom: PhantomData }
  }
}

impl<K: Key, S: System<K>, B: Breaker<K>> SystemSupervisor<K, S, B> {
  pub(crate) async fn start_inner(self, ctx: Context) -> impl Future<Output = ()> {
    let scope = Scope { context: ctx, history: HashMap::new(), nursery: Nursery::default() };
    let mut live = Live { scope, sup: self };
    let result = live.startup().await;

    if let Err(failure) = result {
      panic_any(Escalation::Startup(failure))
    }

    live.main()
  }
}

impl<'a, K: Key, S: System<K>, B: Breaker<K>> Child<'a, ()> for SystemSupervisor<K, S, B> {
  async fn start(self, ctx: Context) -> MainLoop<()> {
    // todo: enable a way for *users* to select this spawning mechanism
    //   this moves us toward runtime agnosticism
    //   it also lets us place a supervisor more specifically on to, say, a local runtime
    //   we might not be able to eliminate work-stealing Send constraints (fine)
    //   but maybe we can get fancier with lifetimes!
    let main = self.start_inner(ctx).await;
    tokio::spawn(main).into()
  }
}

struct Live<K, S, B> {
  scope: Scope<K>,
  sup: SystemSupervisor<K, S, B>,
}

impl<K: Key, S: System<K>, B: Breaker<K>> Live<K, S, B> {
  #[tracing::instrument(
    target = "spry",
    level = "info",
    skip(self),
    fields(
      system = display(self.scope.context.name().to_string()),
      panic, otel.status_code, otel.status_message, otel.status_description
    )
  )]
  pub async fn startup(&mut self) -> Result<(), StartupFailure> {
    let span = Span::current();

    let settling = self.scope.context.settling_token().clone();
    let result = tokio::select! {
      _ = settling.settled() => Err(StartupFailure::Interrupted),
      result = AssertUnwindSafe(async { self.sup.system.start(&mut self.scope).await }).catch_unwind() => match result {
        Ok(x) => Ok(x),
        Err(panic) => match panic.downcast() {
          Ok(known) => Err(*known),
          Err(unknown) => Err(StartupFailure::NonChildFailure(unknown)),
        }
      }
    };

    if let Err(failure) = &result {
      let message = failure.message();
      span.record("panic", &message);
      span.record("otel.status_code", display("ERROR"));
      span.record("otel.status_message", &message);
      span.record("otel.status_description", &message);
    } else {
      span.record("otel.status_code", display("OK"));
    }

    result
  }

  pub async fn main(mut self) {
    loop {
      tokio::select! {
        Some(report) = self.scope.nursery.next_termination() => {
          self.term(report).await
        },
        () = self.scope.context.settled() => break,
        else => break
      }
    }

    // as a final step, always just clean up the children
    // this is necessary to catch settlements
    self.teardown().await;
  }

  #[tracing::instrument(
    target = "spry",
    level = "info",
    skip_all,
    fields(
      system = display(self.scope.context.name().to_string()),
      key = display(report.key.name()),
      burden = display(report.burden.tracing_name()),
      reason, policy, wants_restart, panic,
      otel.status_code, otel.status_message, otel.status_description
    )
  )]
  async fn term(&mut self, report: TerminationReport<K>) {
    let span = Span::current();

    record_reason(&span, &report.reason);

    self.scope.history.insert(report.key.clone(), report.reason.as_outcome());
    span.record("policy", display(report.config.policy.tracing_name()));

    let key = report.key.clone();
    let mut restart_reason = report.into_restart_reason();

    span.record("wants_restart", restart_reason.is_some());

    while restart_reason.is_some() && self.try_restart(key.clone(), &mut restart_reason).await {}
  }

  #[tracing::instrument(
    target = "spry",
    level = "info",
    skip_all,
    fields(
      system = display(self.scope.context.name().to_string()),
      allowed
    )
  )]
  async fn try_restart(&mut self, key: K, restart_reason: &mut Option<RestartReason>) -> bool {
    let span = Span::current();
    let allowed = self.sup.breaker.may_restart(&key, Instant::now());
    span.record("allowed", allowed);

    if !allowed {
      panic_any(Escalation::TooManyRestarts(restart_reason.take().unwrap()))
    }

    self.teardown().await;

    // try to restart the children
    match self.startup().await {
      Ok(()) => *restart_reason = None,
      Err(StartupFailure::Interrupted) => return false,
      Err(e) => *restart_reason = Some(RestartReason::RestartFailure(e)),
    }

    true
  }

  // kill all living children in reverse order
  // post-condition: the nursery is empty
  #[tracing::instrument(
    target = "spry",
    level = "info",
    skip_all,
    fields(system = display(self.scope.context.name().to_string()))
  )]
  async fn teardown(&mut self) {
    loop {
      let Some(k) = self.scope.nursery.last().cloned() else { break };
      self.scope.nursery.settle(&k);
      self.wait_for_termination(k).await;
    }
  }

  #[tracing::instrument(
    target = "spry",
    level = "info",
    skip_all,
    fields(
      system = display(self.scope.context.name().to_string()),
      key = display(key.name()),
      burden, reason, policy, wants_restart, panic,
      otel.status_code, otel.status_message, otel.status_description
    )
  )]
  async fn wait_for_termination(&mut self, key: K) {
    let span = Span::current();

    match self.scope.nursery.wait_for_key(key).instrument(span.clone()).await {
      None => {}
      Some(report) => {
        span.record("burden", display(report.burden.tracing_name()));
        span.record("policy", display(report.config.policy.tracing_name()));
        record_reason(&span, &report.reason);
      }
    }
  }
}

fn record_reason(span: &Span, reason: &TerminationReason) {
  match reason {
    TerminationReason::Normal => {
      span.record("reason", display("normal"));
      span.record("otel.status_code", display("OK"));
    }
    TerminationReason::Aborted => {
      span.record("reason", display("abort"));
      span.record("otel.status_code", display("OK"));
    }
    TerminationReason::Panicked(p) => {
      span.record("reason", display("panic"));
      span.record("otel.status_code", display("ERROR"));

      fn record_panic_message(span: &Span, msg: &str) {
        span.record("otel.status_description", display(msg));
        span.record("otel.status_message", display(msg));
        span.record("panic", display(msg));
      }

      if let Some(m) = p.downcast_ref::<&str>() {
        record_panic_message(span, m)
      } else if let Some(m) = p.downcast_ref::<String>() {
        record_panic_message(span, m.as_str())
      } else if let Some(m) = p.downcast_ref::<Escalation>() {
        let s = format!("{:?}", m);
        record_panic_message(span, s.as_str())
      } else {
        record_panic_message(span, "unknown")
      };
    }
  };
}