use crate::queue::FnOnceQueue;
use crate::rc::ActorRc;
use crate::{core::Nexus, ret, ret_some_do, Core, Deferrer, LogID, Ret, Share2, Stakker};
use slab::Slab;
use std::error::Error;
use std::fmt;
use std::ops::{Deref, DerefMut};
pub struct ActorOwn<A: 'static> {
actor: Actor<A>,
}
impl<A: 'static> ActorOwn<A> {
#[inline]
pub fn new(core: &mut Core, notify: Ret<StopCause>, parent_id: LogID) -> ActorOwn<A> {
Self::construct(Actor {
rc: ActorRc::new(core, Some(notify), parent_id),
})
}
fn construct(actor: Actor<A>) -> Self {
actor.rc.strong_inc();
Self { actor }
}
pub fn owned(&self) -> ActorOwn<A> {
ActorOwn::construct(self.actor.clone())
}
pub fn kill(&self, s: &mut Stakker, err: Box<dyn Error>) {
self.actor.terminate(s, StopCause::Killed(err));
}
pub fn kill_str(&self, s: &mut Stakker, err: &'static str) {
self.actor
.terminate(s, StopCause::Killed(Box::new(StrError(err))));
}
pub fn kill_string(&self, s: &mut Stakker, err: impl Into<String>) {
self.actor
.terminate(s, StopCause::Killed(Box::new(StringError(err.into()))));
}
pub fn anon(self) -> ActorOwnAnon {
ActorOwnAnon::new(self)
}
}
impl<A: 'static> Deref for ActorOwn<A> {
type Target = Actor<A>;
fn deref(&self) -> &Actor<A> {
&self.actor
}
}
impl<A: 'static> DerefMut for ActorOwn<A> {
fn deref_mut(&mut self) -> &mut Actor<A> {
&mut self.actor
}
}
impl<A: 'static> Drop for ActorOwn<A> {
fn drop(&mut self) {
let went_to_zero = self.actor.rc.strong_dec();
if went_to_zero {
let actor = self.actor.clone();
self.actor
.defer(move |s| actor.terminate(s, StopCause::Dropped));
}
}
}
#[allow(dead_code)]
pub struct ActorOwnAnon(Box<dyn ActorOwnAnonTrait>);
trait ActorOwnAnonTrait {}
impl<T: 'static> ActorOwnAnonTrait for ActorOwn<T> {}
impl ActorOwnAnon {
pub fn new<T: 'static>(actorown: ActorOwn<T>) -> Self {
Self(Box::new(actorown))
}
}
pub struct Actor<A: 'static> {
rc: ActorRc<A>,
}
#[derive(Eq, PartialEq, Copy, Clone)]
pub(crate) enum State {
Prep = 0,
Ready = 1,
Zombie = 2,
}
pub(crate) struct Prep {
pub(crate) queue: FnOnceQueue<Stakker>,
}
impl<A> Actor<A> {
pub fn is_zombie(&self) -> bool {
self.rc.is_zombie()
}
fn to_ready(&self, s: &mut Stakker, val: A) {
self.rc.to_ready(s, val);
}
fn terminate(&self, s: &mut Stakker, died: StopCause) {
if let Some(notify) = self.rc.to_zombie(s) {
self.log_termination(s, &died);
notify.ret(died);
}
}
fn log_termination(&self, core: &mut Core, died: &StopCause) {
if cfg!(feature = "logger") {
match died {
StopCause::Stopped => core.log_span_close(self.id(), format_args!(""), |_| {}),
StopCause::Failed(e) => {
core.log_span_close(self.id(), format_args!("{}", e), |out| {
out.kv_null(Some("failed"))
})
}
StopCause::Killed(e) => {
core.log_span_close(self.id(), format_args!("{}", e), |out| {
out.kv_null(Some("killed"))
})
}
StopCause::Dropped => core.log_span_close(self.id(), format_args!(""), |out| {
out.kv_null(Some("dropped"))
}),
StopCause::Lost => core
.log_span_close(self.id(), format_args!(""), |out| out.kv_null(Some("lost"))),
};
}
}
#[inline]
pub fn apply_prep(&self, s: &mut Stakker, f: impl FnOnce(&mut Cx<'_, A>) -> Option<A>) {
if self.rc.is_prep() {
let mut cx = Cx::new(&mut s.nexus, self);
let val = f(&mut cx);
if let Some(die) = cx.die {
self.terminate(s, die);
} else if let Some(val) = val {
self.to_ready(s, val);
}
}
}
#[inline]
pub fn apply(&self, s: &mut Stakker, f: impl FnOnce(&mut A, &mut Cx<'_, A>) + 'static) {
if let Some(val) = self.rc.borrow_ready(&mut s.actor_owner) {
let mut cx = Cx::new(&mut s.nexus, self);
f(val, &mut cx);
if let Some(die) = cx.die {
self.terminate(s, die);
}
} else if let Some(prep) = self.rc.borrow_prep(&mut s.actor_owner) {
let actor = self.clone();
prep.queue.push(move |s| actor.apply(s, f));
}
}
#[inline]
pub fn query<R>(
&self,
s: &mut Stakker,
f: impl FnOnce(&mut A, &mut Cx<'_, A>) -> R,
) -> Option<R> {
if let Some(val) = self.rc.borrow_ready(&mut s.actor_owner) {
let mut cx = Cx::new(&mut s.nexus, self);
let rv = f(val, &mut cx);
if let Some(die) = cx.die {
self.terminate(s, die);
}
Some(rv)
} else {
None
}
}
#[inline]
pub fn id(&self) -> LogID {
self.rc.id()
}
#[inline]
pub fn defer(&self, f: impl FnOnce(&mut Stakker) + 'static) {
self.rc.access_deferrer().defer(f);
}
#[inline]
pub fn access_deferrer(&self) -> &Deferrer {
self.rc.access_deferrer()
}
#[inline]
pub fn access_actor(&self) -> &Self {
self
}
#[inline]
pub fn access_log_id(&self) -> LogID {
self.rc.id()
}
}
impl<A> Clone for Actor<A> {
fn clone(&self) -> Self {
Self {
rc: self.rc.clone(),
}
}
}
pub enum StopCause {
Stopped,
Failed(Box<dyn Error>),
Killed(Box<dyn Error>),
Dropped,
Lost,
}
impl StopCause {
pub fn has_error(&self) -> bool {
matches!(self, StopCause::Failed(_) | StopCause::Killed(_))
}
}
impl std::fmt::Display for StopCause {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Stopped => write!(f, "Actor stopped"),
Self::Failed(e) => write!(f, "Actor failed: {}", e),
Self::Killed(e) => write!(f, "Actor was killed: {}", e),
Self::Dropped => write!(f, "Actor was dropped"),
Self::Lost => write!(f, "Lost connection to actor"),
}
}
}
impl std::fmt::Debug for StopCause {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self)
}
}
pub struct Cx<'a, A: 'static> {
pub(crate) nexus: &'a mut Nexus,
pub(crate) this: &'a Actor<A>,
pub(crate) die: Option<StopCause>,
}
impl<'a, A> Cx<'a, A> {
#[inline]
pub(super) fn new(nexus: &'a mut Nexus, this: &'a Actor<A>) -> Self {
Self {
this,
nexus,
die: None,
}
}
#[inline]
pub fn this(&self) -> &Actor<A> {
self.this
}
#[inline]
pub fn id(&self) -> LogID {
self.this.id()
}
#[inline]
pub fn stop(&mut self) {
if self.die.is_none() {
self.die = Some(StopCause::Stopped);
}
}
#[inline]
pub fn fail(&mut self, e: impl Error + 'static) {
if self.die.is_none() {
self.die = Some(StopCause::Failed(Box::new(e)));
}
}
#[inline]
pub fn fail_str(&mut self, e: &'static str) {
if self.die.is_none() {
self.die = Some(StopCause::Failed(Box::new(StrError(e))));
}
}
#[inline]
pub fn fail_string(&mut self, e: impl Into<String>) {
if self.die.is_none() {
self.die = Some(StopCause::Failed(Box::new(StringError(e.into()))));
}
}
#[inline]
pub fn share2_rw2<'b, T, U>(
&'b mut self,
s1: &'b Share2<T>,
s2: &'b Share2<U>,
) -> (&'b mut T, &'b mut U, &'b mut Core)
where
'a: 'b,
{
let (b1, b2) = self.nexus.share2_owner.rw2(&s1.rc, &s2.rc);
(b1, b2, &mut self.nexus.core)
}
#[inline]
pub fn share2_rw3<'b, T, U, V>(
&'b mut self,
s1: &'b Share2<T>,
s2: &'b Share2<U>,
s3: &'b Share2<V>,
) -> (&'b mut T, &'b mut U, &'b mut V, &'b mut Core)
where
'a: 'b,
{
let (b1, b2, b3) = self.nexus.share2_owner.rw3(&s1.rc, &s2.rc, &s3.rc);
(b1, b2, b3, &mut self.nexus.core)
}
#[inline]
pub fn access_actor(&self) -> &Actor<A> {
self.this
}
#[inline]
pub fn access_log_id(&self) -> LogID {
self.this.id()
}
}
impl<A> Deref for Cx<'_, A> {
type Target = Core;
fn deref(&self) -> &Core {
&self.nexus.core
}
}
impl<A> DerefMut for Cx<'_, A> {
fn deref_mut(&mut self) -> &mut Core {
&mut self.nexus.core
}
}
#[derive(Debug)]
pub(crate) struct StrError(&'static str);
impl Error for StrError {}
impl fmt::Display for StrError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
#[derive(Debug)]
pub(crate) struct StringError(pub String);
impl Error for StringError {}
impl fmt::Display for StringError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
pub struct ActorOwnSlab<T: 'static> {
slab: Slab<ActorOwn<T>>,
}
impl<T: 'static> ActorOwnSlab<T> {
pub fn new() -> Self {
Self::default()
}
#[inline]
pub fn add<P>(
&mut self,
core: &mut Core,
parent: Actor<P>,
get_slab: impl for<'a> FnOnce(&'a mut P) -> &'a mut Self + 'static,
notify: Ret<StopCause>,
) -> Actor<T> {
let vacant = self.slab.vacant_entry();
let key = vacant.key();
let parid = parent.id();
let actorown = ActorOwn::new(
core,
ret_some_do!(move |cause| {
let parent2 = parent.clone();
parent.defer(move |s| {
parent2.apply(s, move |this, _| {
get_slab(this).slab.remove(key);
});
});
ret!([notify], cause);
}),
parid,
);
let actor = actorown.clone();
vacant.insert(actorown);
actor
}
#[inline]
pub fn len(&self) -> usize {
self.slab.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.slab.is_empty()
}
}
impl<T> Default for ActorOwnSlab<T> {
fn default() -> Self {
Self { slab: Slab::new() }
}
}
impl<'a, T> IntoIterator for &'a ActorOwnSlab<T> {
type Item = &'a ActorOwn<T>;
type IntoIter = ActorOwnSlabIter<'a, T>;
fn into_iter(self) -> ActorOwnSlabIter<'a, T> {
ActorOwnSlabIter(self.slab.iter())
}
}
pub struct ActorOwnSlabIter<'a, T: 'static>(slab::Iter<'a, ActorOwn<T>>);
impl<'a, T> Iterator for ActorOwnSlabIter<'a, T> {
type Item = &'a ActorOwn<T>;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|item| item.1)
}
}