use std::{any::Any, future::Future, panic::AssertUnwindSafe, sync::Arc, time::Duration};
use dashmap::DashMap;
use futures::FutureExt;
use fxhash::FxBuildHasher;
use parking_lot::RwLock;
use tracing::info;
use crate as elfo;
use elfo_macros::msg_raw as msg;
use elfo_utils::{CachePadded, ErrorChain};
use crate::{
actor::{Actor, ActorStatus},
addr::Addr,
config::Config,
context::Context,
envelope::Envelope,
errors::TrySendError,
exec::{Exec, ExecResult},
messages,
object::{Object, ObjectArc, ObjectMeta},
routers::{Outcome, Router},
tls, trace_id,
};
pub(crate) struct Supervisor<R: Router<C>, C, X> {
meta: Arc<ObjectMeta>,
context: Context,
objects: DashMap<R::Key, ObjectArc, FxBuildHasher>,
router: R,
exec: X,
control: CachePadded<RwLock<ControlBlock<C>>>,
}
struct ControlBlock<C> {
config: Option<Arc<C>>,
}
macro_rules! get_or_spawn {
($this:ident, $key:expr) => {{
let key = $key;
ward!($this.objects.get(&key), {
$this
.objects
.entry(key.clone())
.or_insert_with(|| $this.spawn(key))
.downgrade()
})
}};
}
impl<R, C, X> Supervisor<R, C, X>
where
R: Router<C>,
X: Exec<Context<C, R::Key>>,
<X::Output as Future>::Output: ExecResult,
C: Config,
{
pub(crate) fn new(ctx: Context, group: String, exec: X, router: R) -> Self {
let control = ControlBlock { config: None };
Self {
meta: Arc::new(ObjectMeta { group, key: None }),
context: ctx,
objects: DashMap::default(),
router,
exec,
control: CachePadded(RwLock::new(control)),
}
}
fn in_scope(&self, f: impl FnOnce()) {
tls::sync_scope(self.meta.clone(), tls::trace_id(), f);
}
pub(crate) fn handle(self: &Arc<Self>, envelope: Envelope) -> RouteReport {
msg!(match &envelope {
messages::ValidateConfig { config } => match config.decode::<C>() {
Ok(config) => {
let is_first_update = self.control.write().config.is_none();
if !is_first_update {
let outcome = self.router.route(&envelope);
let mut envelope = envelope;
envelope.set_message(messages::ValidateConfig { config });
self.do_handle(envelope, outcome.or(Outcome::Broadcast))
} else {
RouteReport::Done
}
}
Err(reason) => {
msg!(match envelope {
(messages::ValidateConfig { .. }, token) => {
let reject = messages::ConfigRejected { reason };
self.context.respond(token, Err(reject));
}
_ => unreachable!(),
});
RouteReport::Done
}
},
messages::UpdateConfig { config } => match config.decode::<C>() {
Ok(config) => {
let mut control = self.control.write();
let is_first_update = control.config.is_none();
control.config = config.get().cloned();
self.router
.update(&control.config.as_ref().expect("just saved"));
self.in_scope(
|| info!(config = ?control.config.as_ref().unwrap(), "router updated"),
);
drop(control);
let outcome = self.router.route(&envelope);
if !is_first_update {
let mut envelope = envelope;
envelope.set_message(messages::UpdateConfig { config });
self.do_handle(envelope, outcome.or(Outcome::Broadcast))
} else {
self.spawn_by_outcome(outcome);
RouteReport::Done
}
}
Err(reason) => {
msg!(match envelope {
(messages::UpdateConfig { .. }, token) => {
let reject = messages::ConfigRejected { reason };
self.context.respond(token, Err(reject));
}
_ => unreachable!(),
});
RouteReport::Done
}
},
_ => {
let outcome = self.router.route(&envelope);
self.do_handle(envelope, outcome)
}
})
}
pub(crate) fn do_handle(
self: &Arc<Self>,
envelope: Envelope,
outcome: Outcome<R::Key>,
) -> RouteReport {
match outcome {
Outcome::Unicast(key) => {
let object = get_or_spawn!(self, key);
let actor = object.as_actor().expect("supervisor stores only actors");
match actor.try_send(envelope) {
Ok(()) => RouteReport::Done,
Err(TrySendError::Full(envelope)) => RouteReport::Wait(object.addr(), envelope),
Err(TrySendError::Closed(envelope)) => RouteReport::Closed(envelope),
}
}
Outcome::Multicast(list) => {
let mut waiters = Vec::new();
let mut someone = false;
for key in list {
let object = get_or_spawn!(self, key);
let envelope = ward!(
envelope.duplicate(self.context.book()),
continue );
let actor = object.as_actor().expect("supervisor stores only actors");
match actor.try_send(envelope) {
Ok(_) => someone = true,
Err(TrySendError::Full(envelope)) => {
waiters.push((object.addr(), envelope))
}
Err(TrySendError::Closed(_)) => {}
}
}
if waiters.is_empty() {
if someone {
RouteReport::Done
} else {
RouteReport::Closed(envelope)
}
} else {
RouteReport::WaitAll(someone, waiters)
}
}
Outcome::Broadcast => {
let mut waiters = Vec::new();
let mut someone = false;
for object in self.objects.iter() {
let envelope = ward!(
envelope.duplicate(self.context.book()),
return RouteReport::Done );
let actor = object.as_actor().expect("supervisor stores only actors");
match actor.try_send(envelope) {
Ok(_) => someone = true,
Err(TrySendError::Full(envelope)) => {
waiters.push((object.addr(), envelope))
}
Err(TrySendError::Closed(_)) => {}
}
}
if waiters.is_empty() {
if someone {
RouteReport::Done
} else {
RouteReport::Closed(envelope)
}
} else {
RouteReport::WaitAll(someone, waiters)
}
}
Outcome::Discard | Outcome::Default => RouteReport::Done,
}
}
fn spawn(self: &Arc<Self>, key: R::Key) -> ObjectArc {
let entry = self.context.book().vacant_entry();
let addr = entry.addr();
let meta = Arc::new(ObjectMeta {
group: self.meta.group.clone(),
key: Some(key.to_string()),
});
let control = self.control.read();
let config = control.config.as_ref().cloned().expect("config is unset");
let ctx = self
.context
.clone()
.with_addr(addr)
.with_group(self.context.addr())
.with_key(key.clone())
.with_config(config);
drop(control);
let sv = self.clone();
let fut = self.exec.exec(ctx);
let fut = async move {
info!(%addr, "started");
let fut = AssertUnwindSafe(async { fut.await.unify() }).catch_unwind();
let new_status = match fut.await {
Ok(Ok(())) => ActorStatus::TERMINATED,
Ok(Err(err)) => ActorStatus::FAILED.with_details(ErrorChain(&*err)),
Err(panic) => ActorStatus::FAILED.with_details(panic_to_string(panic)),
};
let need_to_restart = new_status.is_failed();
sv.objects
.get(&key)
.expect("where is the current actor?")
.as_actor()
.expect("a supervisor stores only actors")
.set_status(new_status);
if need_to_restart {
tokio::time::sleep(Duration::from_secs(5)).await;
sv.objects.insert(key.clone(), sv.spawn(key))
} else {
sv.objects.remove(&key).map(|(_, v)| v)
}
.expect("where is the current actor?");
sv.context.book().remove(addr);
};
entry.insert(Object::new(addr, Actor::new(addr)));
let initial_trace_id = trace_id::generate();
tokio::spawn(tls::scope(meta, initial_trace_id, fut));
self.context.book().get_owned(addr).expect("just created")
}
fn spawn_by_outcome(self: &Arc<Self>, outcome: Outcome<R::Key>) {
match outcome {
Outcome::Unicast(key) => {
get_or_spawn!(self, key);
}
Outcome::Multicast(keys) => {
for key in keys {
get_or_spawn!(self, key);
}
}
Outcome::Broadcast | Outcome::Discard | Outcome::Default => {}
}
}
}
fn panic_to_string(payload: Box<dyn Any>) -> String {
if let Some(message) = payload.downcast_ref::<&str>() {
format!("panic: {}", message)
} else if let Some(message) = payload.downcast_ref::<String>() {
format!("panic: {}", message)
} else {
"panic: <unsupported payload>".to_string()
}
}
pub(crate) enum RouteReport {
Done,
Closed(Envelope),
Wait(Addr, Envelope),
WaitAll(bool, Vec<(Addr, Envelope)>),
}