use futures::FutureExt;
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::panic::{panic_any, AssertUnwindSafe};
use tokio::time::Instant;
use tracing::field::display;
use tracing::{Instrument, Span};
use crate::breaker::Breaker;
use crate::core::{Context, Key, MainLoop, Scope, System};
use crate::error::{Escalation, RestartReason, StartupFailure, TerminationReason};
use crate::nursery::{Nursery, TerminationReport};
use crate::Child;
pub struct SystemSupervisor<K, S, B> {
system: S,
breaker: B,
phantom: PhantomData<K>,
}
impl<K, S, B> SystemSupervisor<K, S, B> {
pub fn new(system: S, breaker: B) -> Self {
Self { system, breaker, phantom: PhantomData }
}
}
impl<K: Key, S: System<K>, B: Breaker<K>> SystemSupervisor<K, S, B> {
pub(crate) async fn start_inner(self, ctx: Context) -> impl Future<Output = ()> {
let scope = Scope { context: ctx, history: HashMap::new(), nursery: Nursery::default() };
let mut live = Live { scope, sup: self };
let result = live.startup().await;
if let Err(failure) = result {
panic_any(Escalation::Startup(failure))
}
live.main()
}
}
impl<'a, K: Key, S: System<K>, B: Breaker<K>> Child<'a, ()> for SystemSupervisor<K, S, B> {
async fn start(self, ctx: Context) -> MainLoop<()> {
let main = self.start_inner(ctx).await;
tokio::spawn(main).into()
}
}
struct Live<K, S, B> {
scope: Scope<K>,
sup: SystemSupervisor<K, S, B>,
}
impl<K: Key, S: System<K>, B: Breaker<K>> Live<K, S, B> {
#[tracing::instrument(
target = "spry",
level = "info",
skip(self),
fields(
system = display(self.scope.context.name().to_string()),
panic, otel.status_code, otel.status_message, otel.status_description
)
)]
pub async fn startup(&mut self) -> Result<(), StartupFailure> {
let span = Span::current();
let settling = self.scope.context.settling_token().clone();
let result = tokio::select! {
_ = settling.settled() => Err(StartupFailure::Interrupted),
result = AssertUnwindSafe(async { self.sup.system.start(&mut self.scope).await }).catch_unwind() => match result {
Ok(x) => Ok(x),
Err(panic) => match panic.downcast() {
Ok(known) => Err(*known),
Err(unknown) => Err(StartupFailure::NonChildFailure(unknown)),
}
}
};
if let Err(failure) = &result {
let message = failure.message();
span.record("panic", &message);
span.record("otel.status_code", display("ERROR"));
span.record("otel.status_message", &message);
span.record("otel.status_description", &message);
} else {
span.record("otel.status_code", display("OK"));
}
result
}
pub async fn main(mut self) {
loop {
tokio::select! {
Some(report) = self.scope.nursery.next_termination() => {
self.term(report).await
},
() = self.scope.context.settled() => break,
else => break
}
}
self.teardown().await;
}
#[tracing::instrument(
target = "spry",
level = "info",
skip_all,
fields(
system = display(self.scope.context.name().to_string()),
key = display(report.key.name()),
burden = display(report.burden.tracing_name()),
reason, policy, wants_restart, panic,
otel.status_code, otel.status_message, otel.status_description
)
)]
async fn term(&mut self, report: TerminationReport<K>) {
let span = Span::current();
record_reason(&span, &report.reason);
self.scope.history.insert(report.key.clone(), report.reason.as_outcome());
span.record("policy", display(report.config.policy.tracing_name()));
let key = report.key.clone();
let mut restart_reason = report.into_restart_reason();
span.record("wants_restart", restart_reason.is_some());
while restart_reason.is_some() && self.try_restart(key.clone(), &mut restart_reason).await {}
}
#[tracing::instrument(
target = "spry",
level = "info",
skip_all,
fields(
system = display(self.scope.context.name().to_string()),
allowed
)
)]
async fn try_restart(&mut self, key: K, restart_reason: &mut Option<RestartReason>) -> bool {
let span = Span::current();
let allowed = self.sup.breaker.may_restart(&key, Instant::now());
span.record("allowed", allowed);
if !allowed {
panic_any(Escalation::TooManyRestarts(restart_reason.take().unwrap()))
}
self.teardown().await;
match self.startup().await {
Ok(()) => *restart_reason = None,
Err(StartupFailure::Interrupted) => return false,
Err(e) => *restart_reason = Some(RestartReason::RestartFailure(e)),
}
true
}
#[tracing::instrument(
target = "spry",
level = "info",
skip_all,
fields(system = display(self.scope.context.name().to_string()))
)]
async fn teardown(&mut self) {
loop {
let Some(k) = self.scope.nursery.last().cloned() else { break };
self.scope.nursery.settle(&k);
self.wait_for_termination(k).await;
}
}
#[tracing::instrument(
target = "spry",
level = "info",
skip_all,
fields(
system = display(self.scope.context.name().to_string()),
key = display(key.name()),
burden, reason, policy, wants_restart, panic,
otel.status_code, otel.status_message, otel.status_description
)
)]
async fn wait_for_termination(&mut self, key: K) {
let span = Span::current();
match self.scope.nursery.wait_for_key(key).instrument(span.clone()).await {
None => {}
Some(report) => {
span.record("burden", display(report.burden.tracing_name()));
span.record("policy", display(report.config.policy.tracing_name()));
record_reason(&span, &report.reason);
}
}
}
}
fn record_reason(span: &Span, reason: &TerminationReason) {
match reason {
TerminationReason::Normal => {
span.record("reason", display("normal"));
span.record("otel.status_code", display("OK"));
}
TerminationReason::Aborted => {
span.record("reason", display("abort"));
span.record("otel.status_code", display("OK"));
}
TerminationReason::Panicked(p) => {
span.record("reason", display("panic"));
span.record("otel.status_code", display("ERROR"));
fn record_panic_message(span: &Span, msg: &str) {
span.record("otel.status_description", display(msg));
span.record("otel.status_message", display(msg));
span.record("panic", display(msg));
}
if let Some(m) = p.downcast_ref::<&str>() {
record_panic_message(span, m)
} else if let Some(m) = p.downcast_ref::<String>() {
record_panic_message(span, m.as_str())
} else if let Some(m) = p.downcast_ref::<Escalation>() {
let s = format!("{:?}", m);
record_panic_message(span, s.as_str())
} else {
record_panic_message(span, "unknown")
};
}
};
}