use std::{future::Future, sync::atomic::AtomicU64};
use async_channel::{Sender, WeakSender};
use crate::{
executor::Context,
message::{Envelope, Message},
};
static ADDRESS_COUNTER: AtomicU64 = AtomicU64::new(0);
pub trait Actor: Sized {
fn starting(&mut self, _ctx: &Context<Self>) -> impl Future<Output = ()> + Send {
std::future::ready(())
}
fn stopping(&mut self, _ctx: &Context<Self>) -> impl Future<Output = ()> + Send {
std::future::ready(())
}
}
pub trait Handler<M>
where
Self: Actor,
M: Message,
{
fn handle(&mut self, msg: M, ctx: &Context<Self>) -> impl Future<Output = ()> + Send;
}
#[derive(Debug)]
pub struct Address<A> {
id: u64,
sender: Sender<Envelope<A>>,
}
impl<A> PartialEq for Address<A> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
unsafe impl<A> std::marker::Send for Address<A> {}
unsafe impl<A> std::marker::Sync for Address<A> {}
impl<A> std::marker::Unpin for Address<A> {}
impl<A> Clone for Address<A> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
id: self.id,
}
}
}
impl<A> Address<A> {
pub(crate) fn new(sender: Sender<Envelope<A>>) -> Self {
let id = ADDRESS_COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Self { sender, id }
}
pub fn downgrade(&self) -> WeakAddress<A> {
let sender = self.sender.downgrade();
WeakAddress::new(self.id, sender)
}
#[cfg(feature = "mocking")]
pub fn new_leak(cap: usize) -> Self {
let (sender, receiver) = async_channel::bounded::<Envelope<A>>(cap);
Box::leak(Box::new(receiver));
Self::new(sender)
}
}
impl<A> Address<A>
where
A: 'static + Actor + Send,
{
pub async fn send<M>(&self, message: M)
where
A: Handler<M>,
M: Message,
{
let env = Envelope::pack(message);
let _ = self.sender.send(env).await;
}
pub fn try_send<M>(&self, message: M)
where
A: Handler<M>,
M: Message,
{
let env = Envelope::pack(message);
let _ = self.sender.try_send(env);
}
}
#[derive(Debug)]
pub struct WeakAddress<A> {
id: u64,
sender: WeakSender<Envelope<A>>,
}
impl<A> Clone for WeakAddress<A> {
fn clone(&self) -> Self {
Self {
id: self.id,
sender: self.sender.clone(),
}
}
}
impl<A> WeakAddress<A> {
pub(crate) fn new(id: u64, sender: WeakSender<Envelope<A>>) -> Self {
Self { id, sender }
}
pub fn upgrade(&self) -> Option<Address<A>> {
let sender = self.sender.upgrade()?;
Some(Address::new(sender))
}
}
unsafe impl<A> std::marker::Send for WeakAddress<A> {}
unsafe impl<A> std::marker::Sync for WeakAddress<A> {}
impl<A> std::marker::Unpin for WeakAddress<A> {}
#[cfg(test)]
mod test {
use std::sync::Mutex;
use crate::Executor;
use super::*;
struct Msg;
struct Act;
impl Actor for Act {}
impl Handler<Msg> for Act {
async fn handle(&mut self, _msg: Msg, _ctx: &Context<Self>) {}
}
#[test]
fn partial_eq_on_clone() {
let (_executor, address) = Executor::new(Act);
let same_address = address.clone();
assert!(address.eq(&same_address));
}
#[test]
fn partial_eq_on_different_addrs() {
let (_executor_1, address_1) = Executor::new(Act);
let (_executor_2, address_2) = Executor::new(Act);
assert!(address_1.ne(&address_2));
}
#[test]
fn partial_eq_on_a_thousand_different_addrs() {
let mut addrs: Vec<Address<Act>> = Vec::new();
for _ in 0..1_000 {
let (_executor_1, address) = Executor::new(Act);
for addr in addrs.iter() {
assert!(addr.ne(&address));
}
addrs.push(address);
}
}
#[test]
fn partial_eq_on_a_thousand_different_threads() {
const NUM_THREAD: usize = 1_000;
let addrs = Mutex::new(Vec::<Address<Act>>::new());
std::thread::scope(|s| {
for _ in 0..NUM_THREAD {
s.spawn(|| {
let (_executor_1, address) = Executor::new(Act);
addrs.lock().unwrap().push(address);
});
}
});
let addrs = std::mem::take(&mut *addrs.lock().unwrap());
assert_eq!(addrs.len(), NUM_THREAD);
for i in 0..NUM_THREAD {
for j in (i + 1)..NUM_THREAD {
assert!(addrs[i].ne(&addrs[j]))
}
}
}
}