use futures::FutureExt;
use std::marker::PhantomData;
use std::panic::{panic_any, AssertUnwindSafe};
use std::time::Duration;
use tracing::Instrument;
use crate::breaker::{Breaker, RateLimited};
use crate::core::{ChildName, Key, MainLoop, Scope, System};
use crate::error::StartupFailure;
use crate::internal::{ChildConfig, Policy, StartFn};
use crate::policy::{Permanent, Temporary, Transient};
use crate::system::SystemSupervisor;
#[must_use]
pub struct ChildBuilder<'a, K, P, R> {
key: K,
start_fn: StartFn<'a, R>,
scope: &'a mut Scope<K>,
settling_timeout: Option<Duration>,
phantom: PhantomData<P>,
}
impl<'a, K, P, R> ChildBuilder<'a, K, P, R>
where
K: Key,
P: Policy<R>,
{
const DEFAULT_SETTLING_TIMEOUT: Duration = Duration::from_secs(5);
pub(crate) fn new(scope: &'a mut Scope<K>, key: K, start_fn: StartFn<'a, R>) -> Self {
Self { key, start_fn, scope, settling_timeout: Some(Self::DEFAULT_SETTLING_TIMEOUT), phantom: PhantomData }
}
pub fn with_policy<Lp: Policy<R>>(self) -> ChildBuilder<'a, K, Lp, R> {
ChildBuilder {
key: self.key,
start_fn: self.start_fn,
scope: self.scope,
settling_timeout: self.settling_timeout,
phantom: PhantomData,
}
}
pub fn into_permanent(self) -> ChildBuilder<'a, K, Permanent, R> {
self.with_policy()
}
pub fn into_transient(self) -> ChildBuilder<'a, K, Transient, R> {
self.with_policy()
}
pub fn into_temporary(self) -> ChildBuilder<'a, K, Temporary, R> {
self.with_policy()
}
pub fn with_settling_timeout(mut self, duration: Duration) -> Self {
self.settling_timeout = Some(duration);
self
}
pub fn with_settling_indefinite(mut self) -> Self {
self.settling_timeout = None;
self
}
}
impl<'a, K, P, R> ChildBuilder<'a, K, P, R>
where
K: Key,
P: Policy<R>,
R: 'a,
{
pub async fn spawn(self) -> P::Output {
if self.scope.nursery.contains_key(&self.key) {
let name = format!("{parent}.{child}", parent = self.scope.context.name(), child = self.key.name());
panic_any(StartupFailure::ReusedKey(ChildName(name)));
}
if P::should_skip(self.scope.history.get(&self.key)) {
return P::fail();
}
let ctx = self.scope.context.new_child_context(self.key.name());
let shutdown_token = ctx.shutdown_token().clone();
let config = ChildConfig { policy: P::concrete(), settling_timeout: self.settling_timeout };
let name = ctx.name().clone();
let span = tracing::info_span!(
target: "spry", "spawn",
system = display(self.scope.context.name().to_string()),
key = display(self.key.name())
);
let settling_token = ctx.settling_token().clone();
let termination_token = ctx.termination_token().clone();
let fut = AssertUnwindSafe(async { (self.start_fn)(ctx).await }.instrument(span)).catch_unwind();
tokio::select! {
biased;
result = fut => match result {
Err(panic) => panic_any(StartupFailure::ChildFailure(name, panic)),
Ok(MainLoop { handle, return_value }) => {
let _ =
self.scope.nursery.try_insert(self.key, config, shutdown_token, settling_token, termination_token, handle);
P::output(return_value)
}
}
}
}
}
pub struct SystemBuilder<'a, K, P, S, B> {
key: K,
system: S,
scope: &'a mut Scope<K>,
breaker: B,
phantom: PhantomData<P>,
}
impl<'a, K, P, S> SystemBuilder<'a, K, P, S, RateLimited> {
pub(crate) fn new(key: K, system: S, scope: &'a mut Scope<K>) -> Self {
let breaker = RateLimited::default();
Self { key, system, scope, breaker, phantom: PhantomData }
}
}
impl<'a, K, P, S, B> SystemBuilder<'a, K, P, S, B> {
pub fn with_style<PP: Policy<()>>(self) -> SystemBuilder<'a, K, PP, S, B> {
SystemBuilder { key: self.key, system: self.system, scope: self.scope, breaker: self.breaker, phantom: PhantomData }
}
pub fn into_permanent(self) -> SystemBuilder<'a, K, Permanent, S, B> {
self.with_style()
}
pub fn into_transient(self) -> SystemBuilder<'a, K, Transient, S, B> {
self.with_style()
}
pub fn into_temporary(self) -> SystemBuilder<'a, K, Temporary, S, B> {
self.with_style()
}
pub fn with_breaker<Pp: Breaker<K>>(self, breaker: Pp) -> SystemBuilder<'a, K, P, S, Pp> {
SystemBuilder { key: self.key, system: self.system, scope: self.scope, breaker, phantom: PhantomData }
}
}
impl<'a, K, P, S, B> SystemBuilder<'a, K, P, S, B>
where
K: Key,
P: Policy<()>,
S: System<K>,
B: Breaker<K>,
{
pub async fn spawn(self) -> P::Output {
let Self { system, breaker, scope, key, .. } = self;
let sup = SystemSupervisor::new(system, breaker);
scope.child(key, sup).with_policy::<P>().with_settling_indefinite().spawn().await
}
}