acto 0.8.0

light-weight Actor library for Rust
Documentation
use crate::{
    snd_rcv::{BlackholeSender, MappedSender},
    Sender,
};
use parking_lot::Mutex;
use smol_str::SmolStr;
use std::{
    fmt::Debug,
    hash::Hash,
    sync::{
        atomic::{AtomicBool, AtomicUsize, Ordering},
        Arc,
    },
    task::Waker,
};

/// Every actor has an ID that is generated by `acto`, independent of the [`ActoRuntime`] used.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActoId(pub(crate) usize);

impl ActoId {
    pub(crate) fn next() -> Self {
        static COUNTER: AtomicUsize = AtomicUsize::new(0);
        let id = COUNTER.fetch_add(1, Ordering::Relaxed);
        if id == usize::MAX {
            panic!("ActoId wrap-around! Cannot create more than {} actors", id)
        }
        Self(id)
    }
}

/// A handle for sending messages to an actor.
///
/// You may freely clone or share this handle and store it in collections.
pub struct ActoRef<M>(Arc<ActoRefInner<dyn Sender<M>>>);

pub(crate) struct ActoRefInner<S: ?Sized> {
    fields: Inner,
    id: ActoId,
    pub(crate) sender: S,
}

enum Inner {
    Straight(Straight),
    Mapped(Mapped),
    Blackhole,
}
unsafe impl Send for Inner {}
unsafe impl Sync for Inner {}

struct Straight {
    name: SmolStr,
    count: AtomicUsize,
    dead: AtomicBool,
    waker: Mutex<Option<Waker>>,
}

impl From<&Straight> for Mapped {
    fn from(s: &Straight) -> Self {
        Mapped {
            name: s.name.as_str(),
            count: &s.count,
            dead: &s.dead,
            waker: &s.waker,
        }
    }
}

/// safety: the pointers are guaranteed to not be dangling because the
/// associated `sender` holds a strong reference to the Arc providing them.
#[derive(Clone, Copy)]
struct Mapped {
    name: *const str,
    count: *const AtomicUsize,
    dead: *const AtomicBool,
    waker: *const Mutex<Option<Waker>>,
}

impl<T, U> PartialEq<ActoRef<U>> for ActoRef<T> {
    fn eq(&self, other: &ActoRef<U>) -> bool {
        self.0.id == other.0.id
    }
}

impl<T> Eq for ActoRef<T> {}

impl<T, U> PartialOrd<ActoRef<U>> for ActoRef<T> {
    fn partial_cmp(&self, other: &ActoRef<U>) -> Option<std::cmp::Ordering> {
        self.0.id.partial_cmp(&other.0.id)
    }
}

impl<T> Ord for ActoRef<T> {
    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
        self.0.id.cmp(&other.0.id)
    }
}

impl<T> Hash for ActoRef<T> {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.0.id.hash(state);
    }
}

impl<M> ActoRef<M> {
    pub(crate) fn new(id: ActoId, name: SmolStr, sender: impl Sender<M> + 'static) -> Self {
        Self(Arc::new(ActoRefInner {
            fields: Inner::Straight(Straight {
                name,
                count: AtomicUsize::new(0),
                dead: AtomicBool::new(false),
                waker: Mutex::new(None),
            }),
            id,
            sender,
        }))
    }

    /// The [`ActoId`] of the referenced actor.
    pub fn id(&self) -> ActoId {
        self.0.id
    }

    /// The actor’s given name plus ActoRuntime name and ActoId.
    pub fn name(&self) -> &str {
        match &self.0.fields {
            Inner::Straight(s) => s.name.as_str(),
            Inner::Mapped(m) => unsafe { &*m.name },
            Inner::Blackhole => "blackhole(acto/0)",
        }
    }

    #[inline(always)]
    fn with_count(&self, f: impl FnOnce(&AtomicUsize) -> usize) -> usize {
        match &self.0.fields {
            Inner::Straight(s) => f(&s.count),
            Inner::Mapped(m) => f(unsafe { &*m.count }),
            Inner::Blackhole => 0,
        }
    }

    pub(crate) fn get_count(&self) -> usize {
        self.with_count(|c| c.load(Ordering::SeqCst))
    }

    pub(crate) fn dead(&self) {
        match &self.0.fields {
            Inner::Straight(s) => s.dead.store(true, Ordering::Release),
            Inner::Mapped(m) => unsafe { &*m.dead }.store(true, Ordering::Release),
            Inner::Blackhole => {}
        }
    }

    pub(crate) fn waker(&self, waker: Waker) {
        match &self.0.fields {
            Inner::Straight(s) => *s.waker.lock() = Some(waker),
            Inner::Mapped(m) => *unsafe { &*m.waker }.lock() = Some(waker),
            Inner::Blackhole => {}
        }
    }

    pub(crate) fn wake(&self) {
        let waker = match &self.0.fields {
            Inner::Straight(s) => s.waker.lock().take(),
            Inner::Mapped(m) => unsafe { &*m.waker }.lock().take(),
            Inner::Blackhole => None,
        };
        if let Some(waker) = waker {
            waker.wake();
        }
    }

    /// Check whether the referenced actor is in principle still ready to receive messages.
    ///
    /// Note that this is not the same as [`ActoHandle::is_finished`], which checks whether
    /// the actor’s task is done. An actor could drop its [`ActoCell`] (yielding `true` here)
    /// or it could move it to another async task (yielding `true` from `ActoHandle`).
    ///
    /// Note that a “blackhole” reference is immortal.
    pub fn is_gone(&self) -> bool {
        match &self.0.fields {
            Inner::Straight(s) => s.dead.load(Ordering::Acquire),
            Inner::Mapped(m) => unsafe { &*m.dead }.load(Ordering::Acquire),
            Inner::Blackhole => false,
        }
    }
}

impl<M: Send + 'static> ActoRef<M> {
    /// Create a dummy reference that drops all messages
    pub fn blackhole() -> Self {
        Self(Arc::new(ActoRefInner {
            fields: Inner::Blackhole,
            id: ActoId::next(),
            sender: BlackholeSender::new(),
        }))
    }

    /// Send a message to the referenced actor.
    ///
    /// The employed channel may be at its capacity bound and the target actor may
    /// already be terminated, in which cases the message is dropped and `false` is
    /// returned.
    ///
    /// This method does not return the unsent message in the above cases because the
    /// message may not be delivered even though `true` is returned. In other words,
    /// you cannot rely on the return value to conclude that the message was delivered.
    /// Instead, have the target actor send a confirmation message back to you.
    pub fn send(&self, msg: M) -> bool {
        tracing::trace!(target = ?self, "send");
        self.0.sender.send(msg)
    }

    /// Send a message to the referenced actor and wait for it to be delivered.
    ///
    /// This method returns `false` if the message is dropped due to the target
    /// mailbox being closed (which is always true for [`ActoRef::blackhole`]).
    pub async fn send_wait(&self, msg: M) -> bool {
        tracing::trace!(target = ?self, "send_wait");
        self.0.sender.send_wait(msg).await
    }

    /// Derive an ActoRef accepting a different type of message, typically embedded in an `enum`.
    ///
    /// ```rust
    /// # use acto::{AcTokio, ActoRuntime, ActoCell, ActoInput};
    /// use tokio::sync::oneshot;
    /// async fn actor(mut cell: ActoCell<Option<oneshot::Sender<i32>>, impl ActoRuntime>) {
    ///     while let ActoInput::Message(Some(channel)) = cell.recv().await {
    ///         channel.send(42).ok();
    ///     }
    /// }
    /// let rt = AcTokio::new("test", 1).unwrap();
    /// let ar = rt.spawn_actor("a", actor).me.contramap(|msg| Some(msg));
    /// let (tx, rx) = oneshot::channel();
    /// ar.send(tx);
    /// # let response = rt.with_rt(|rt| rt.block_on(rx)).unwrap().unwrap();
    /// # assert_eq!(response, 42);
    /// ```
    pub fn contramap<M2: Send + 'static>(
        &self,
        f: impl Fn(M2) -> M + Send + Sync + 'static,
    ) -> ActoRef<M2> {
        let fields = match &self.0.fields {
            Inner::Straight(s) => Inner::Mapped(s.into()),
            Inner::Mapped(x) => Inner::Mapped(*x),
            Inner::Blackhole => Inner::Blackhole,
        };
        let count = self.with_count(|c| c.fetch_add(1, Ordering::SeqCst));
        tracing::trace!(count, ?self, "contramap");
        let orig = self.0.clone();
        let inner = ActoRefInner {
            fields,
            id: orig.id,
            sender: MappedSender::new(f, orig),
        };
        ActoRef(Arc::new(inner))
    }

    pub fn is_blackhole(&self) -> bool {
        matches!(self.0.fields, Inner::Blackhole)
    }
}

impl<M> Clone for ActoRef<M> {
    fn clone(&self) -> Self {
        let count = self.with_count(|c| c.fetch_add(1, Ordering::SeqCst));
        tracing::trace!(count, ?self, "clone");
        Self(self.0.clone())
    }
}

impl<M> Drop for ActoRef<M> {
    fn drop(&mut self) {
        let count = self.with_count(|c| c.fetch_sub(1, Ordering::SeqCst));
        tracing::trace!(count, ?self, "drop");
        if count == 1 {
            self.wake()
        }
    }
}

impl<M> Debug for ActoRef<M> {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "ActoRef({})", self.name())
    }
}