use crate::{
snd_rcv::{BlackholeSender, MappedSender},
Sender,
};
use parking_lot::Mutex;
use smol_str::SmolStr;
use std::{
fmt::Debug,
hash::Hash,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
task::Waker,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActoId(pub(crate) usize);
impl ActoId {
pub(crate) fn next() -> Self {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
let id = COUNTER.fetch_add(1, Ordering::Relaxed);
if id == usize::MAX {
panic!("ActoId wrap-around! Cannot create more than {} actors", id)
}
Self(id)
}
}
pub struct ActoRef<M>(Arc<ActoRefInner<dyn Sender<M>>>);
pub(crate) struct ActoRefInner<S: ?Sized> {
fields: Inner,
id: ActoId,
pub(crate) sender: S,
}
enum Inner {
Straight(Straight),
Mapped(Mapped),
Blackhole,
}
unsafe impl Send for Inner {}
unsafe impl Sync for Inner {}
struct Straight {
name: SmolStr,
count: AtomicUsize,
dead: AtomicBool,
waker: Mutex<Option<Waker>>,
}
impl From<&Straight> for Mapped {
fn from(s: &Straight) -> Self {
Mapped {
name: s.name.as_str(),
count: &s.count,
dead: &s.dead,
waker: &s.waker,
}
}
}
#[derive(Clone, Copy)]
struct Mapped {
name: *const str,
count: *const AtomicUsize,
dead: *const AtomicBool,
waker: *const Mutex<Option<Waker>>,
}
impl<T, U> PartialEq<ActoRef<U>> for ActoRef<T> {
fn eq(&self, other: &ActoRef<U>) -> bool {
self.0.id == other.0.id
}
}
impl<T> Eq for ActoRef<T> {}
impl<T, U> PartialOrd<ActoRef<U>> for ActoRef<T> {
fn partial_cmp(&self, other: &ActoRef<U>) -> Option<std::cmp::Ordering> {
self.0.id.partial_cmp(&other.0.id)
}
}
impl<T> Ord for ActoRef<T> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.0.id.cmp(&other.0.id)
}
}
impl<T> Hash for ActoRef<T> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.0.id.hash(state);
}
}
impl<M> ActoRef<M> {
pub(crate) fn new(id: ActoId, name: SmolStr, sender: impl Sender<M> + 'static) -> Self {
Self(Arc::new(ActoRefInner {
fields: Inner::Straight(Straight {
name,
count: AtomicUsize::new(0),
dead: AtomicBool::new(false),
waker: Mutex::new(None),
}),
id,
sender,
}))
}
pub fn id(&self) -> ActoId {
self.0.id
}
pub fn name(&self) -> &str {
match &self.0.fields {
Inner::Straight(s) => s.name.as_str(),
Inner::Mapped(m) => unsafe { &*m.name },
Inner::Blackhole => "blackhole(acto/0)",
}
}
#[inline(always)]
fn with_count(&self, f: impl FnOnce(&AtomicUsize) -> usize) -> usize {
match &self.0.fields {
Inner::Straight(s) => f(&s.count),
Inner::Mapped(m) => f(unsafe { &*m.count }),
Inner::Blackhole => 0,
}
}
pub(crate) fn get_count(&self) -> usize {
self.with_count(|c| c.load(Ordering::SeqCst))
}
pub(crate) fn dead(&self) {
match &self.0.fields {
Inner::Straight(s) => s.dead.store(true, Ordering::Release),
Inner::Mapped(m) => unsafe { &*m.dead }.store(true, Ordering::Release),
Inner::Blackhole => {}
}
}
pub(crate) fn waker(&self, waker: Waker) {
match &self.0.fields {
Inner::Straight(s) => *s.waker.lock() = Some(waker),
Inner::Mapped(m) => *unsafe { &*m.waker }.lock() = Some(waker),
Inner::Blackhole => {}
}
}
pub(crate) fn wake(&self) {
let waker = match &self.0.fields {
Inner::Straight(s) => s.waker.lock().take(),
Inner::Mapped(m) => unsafe { &*m.waker }.lock().take(),
Inner::Blackhole => None,
};
if let Some(waker) = waker {
waker.wake();
}
}
pub fn is_gone(&self) -> bool {
match &self.0.fields {
Inner::Straight(s) => s.dead.load(Ordering::Acquire),
Inner::Mapped(m) => unsafe { &*m.dead }.load(Ordering::Acquire),
Inner::Blackhole => false,
}
}
}
impl<M: Send + 'static> ActoRef<M> {
pub fn blackhole() -> Self {
Self(Arc::new(ActoRefInner {
fields: Inner::Blackhole,
id: ActoId::next(),
sender: BlackholeSender::new(),
}))
}
pub fn send(&self, msg: M) -> bool {
tracing::trace!(target = ?self, "send");
self.0.sender.send(msg)
}
pub async fn send_wait(&self, msg: M) -> bool {
tracing::trace!(target = ?self, "send_wait");
self.0.sender.send_wait(msg).await
}
pub fn contramap<M2: Send + 'static>(
&self,
f: impl Fn(M2) -> M + Send + Sync + 'static,
) -> ActoRef<M2> {
let fields = match &self.0.fields {
Inner::Straight(s) => Inner::Mapped(s.into()),
Inner::Mapped(x) => Inner::Mapped(*x),
Inner::Blackhole => Inner::Blackhole,
};
let count = self.with_count(|c| c.fetch_add(1, Ordering::SeqCst));
tracing::trace!(count, ?self, "contramap");
let orig = self.0.clone();
let inner = ActoRefInner {
fields,
id: orig.id,
sender: MappedSender::new(f, orig),
};
ActoRef(Arc::new(inner))
}
pub fn is_blackhole(&self) -> bool {
matches!(self.0.fields, Inner::Blackhole)
}
}
impl<M> Clone for ActoRef<M> {
fn clone(&self) -> Self {
let count = self.with_count(|c| c.fetch_add(1, Ordering::SeqCst));
tracing::trace!(count, ?self, "clone");
Self(self.0.clone())
}
}
impl<M> Drop for ActoRef<M> {
fn drop(&mut self) {
let count = self.with_count(|c| c.fetch_sub(1, Ordering::SeqCst));
tracing::trace!(count, ?self, "drop");
if count == 1 {
self.wake()
}
}
}
impl<M> Debug for ActoRef<M> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ActoRef({})", self.name())
}
}